aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2023-02-21 13:06:58 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2023-02-21 13:06:58 +0000
commit63702f1cfcee53e2433c10216912b4ef739a6086 (patch)
treeed0b8aa339bf89f3fc4c9e45c4cec29937de5445
parent813df553bcf33a6fe161a22b6f6b93b2780308b5 (diff)
stop using FastOS_Thread in vespalib
-rw-r--r--vespalib/src/tests/clock/clock_benchmark.cpp104
-rw-r--r--vespalib/src/vespa/vespalib/util/thread.h1
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp33
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h11
4 files changed, 57 insertions, 92 deletions
diff --git a/vespalib/src/tests/clock/clock_benchmark.cpp b/vespalib/src/tests/clock/clock_benchmark.cpp
index a21ad2b05ef..620b4e5c83c 100644
--- a/vespalib/src/tests/clock/clock_benchmark.cpp
+++ b/vespalib/src/tests/clock/clock_benchmark.cpp
@@ -2,7 +2,6 @@
#include <vespa/vespalib/util/clock.h>
#include <vespa/vespalib/util/invokeserviceimpl.h>
-#include <vespa/fastos/thread.h>
#include <cassert>
#include <vector>
#include <atomic>
@@ -10,6 +9,7 @@
#include <cstring>
#include <condition_variable>
#include <mutex>
+#include <thread>
using vespalib::Clock;
using vespalib::steady_time;
@@ -36,7 +36,7 @@ struct NSAtomic : public UpdateClock {
std::atomic<int64_t> _value;
};
-class TestClock : public FastOS_Runnable
+class TestClock
{
private:
int _timePeriodMS;
@@ -44,8 +44,9 @@ private:
std::condition_variable _cond;
UpdateClock &_clock;
bool _stop;
+ std::thread _thread;
- void Run(FastOS_ThreadInterface *thisThread, void *arguments) override;
+ void run();
public:
TestClock(UpdateClock & clock, double timePeriod)
@@ -53,74 +54,68 @@ public:
_lock(),
_cond(),
_clock(clock),
- _stop(false)
- { }
+ _stop(false),
+ _thread()
+ {
+ _thread = std::thread([this](){run();});
+ }
~TestClock() {
- std::lock_guard<std::mutex> guard(_lock);
- _stop = true;
- _cond.notify_all();
+ {
+ std::lock_guard<std::mutex> guard(_lock);
+ _stop = true;
+ _cond.notify_all();
+ }
+ _thread.join();
}
};
-void TestClock::Run(FastOS_ThreadInterface *thread, void *)
+void TestClock::run()
{
std::unique_lock<std::mutex> guard(_lock);
- while ( ! thread->GetBreakFlag() && !_stop) {
+ while (!_stop) {
_clock.update();
_cond.wait_for(guard, std::chrono::milliseconds(_timePeriodMS));
}
}
-struct SamplerBase : public FastOS_Runnable {
- SamplerBase(uint32_t threadId)
- : _thread(nullptr),
- _threadId(threadId),
- _samples(0),
- _count()
+template<typename Func>
+struct Sampler {
+ Sampler(Func func, uint64_t samples)
+ : _samples(samples),
+ _count(),
+ _func(func),
+ _thread()
{
memset(_count, 0, sizeof(_count));
+ _thread = std::thread([this](){run();});
}
- FastOS_ThreadInterface * _thread;
- uint32_t _threadId;
- uint64_t _samples;
- uint64_t _count[3];
-};
-
-template<typename Func>
-struct Sampler : public SamplerBase {
- Sampler(Func func, uint32_t threadId)
- : SamplerBase(threadId),
- _func(func)
- { }
- void Run(FastOS_ThreadInterface *, void *) override {
- uint64_t samples;
+ void run() {
steady_time prev = _func();
- for (samples = 0; (samples < _samples); samples++) {
+ for (uint64_t samples = 0; samples < _samples; ++samples) {
steady_time now = _func();
duration diff = now - prev;
if (diff > duration::zero()) prev = now;
_count[1 + ((diff == duration::zero()) ? 0 : (diff > duration::zero()) ? 1 : -1)]++;
}
-
}
- Func _func;
+ uint64_t _samples;
+ uint64_t _count[3];
+ Func _func;
+ std::thread _thread;
};
template<typename Func>
-void benchmark(const char * desc, FastOS_ThreadPool & pool, uint64_t samples, uint32_t numThreads, Func func) {
- std::vector<std::unique_ptr<SamplerBase>> threads;
+void benchmark(const char * desc, uint64_t samples, uint32_t numThreads, Func func) {
+ std::vector<std::unique_ptr<Sampler<Func>>> threads;
threads.reserve(numThreads);
steady_time start = steady_clock::now();
for (uint32_t i(0); i < numThreads; i++) {
- SamplerBase * sampler = new Sampler<Func>(func, i);
- sampler->_samples = samples;
- sampler->_thread = pool.NewThread(sampler, nullptr);
- threads.emplace_back(sampler);
+ threads.push_back(std::make_unique<Sampler<Func>>(func, samples));
}
uint64_t count[3];
memset(count, 0, sizeof(count));
for (const auto & sampler : threads) {
- sampler->_thread->Join();
+ sampler->_thread.join();
for (uint32_t i(0); i < 3; i++) {
count[i] += sampler->_count[i];
}
@@ -129,12 +124,15 @@ void benchmark(const char * desc, FastOS_ThreadPool & pool, uint64_t samples, ui
}
int
-main(int , char *argv[])
+main(int argc, char *argv[])
{
+ if (argc != 4) {
+ fprintf(stderr, "usage: %s <frequency> <numThreads> <samples>\n", argv[0]);
+ return 1;
+ }
uint64_t frequency = atoll(argv[1]);
uint32_t numThreads = atoi(argv[2]);
uint64_t samples = atoll(argv[3]);
- FastOS_ThreadPool pool;
NSValue nsValue;
NSVolatile nsVolatile;
NSAtomic nsAtomic;
@@ -143,36 +141,30 @@ main(int , char *argv[])
TestClock nsClock(nsValue, 1.0/frequency);
TestClock nsVolatileClock(nsVolatile, 1.0/frequency);
TestClock nsAtomicClock(nsAtomic, 1.0/frequency);
- assert(pool.NewThread(&nsClock, nullptr) != nullptr);
- assert(pool.NewThread(&nsVolatileClock, nullptr) != nullptr);
- assert(pool.NewThread(&nsAtomicClock, nullptr) != nullptr);
- benchmark("vespalib::Clock", pool, samples, numThreads, [&clock]() {
+ benchmark("vespalib::Clock", samples, numThreads, [&clock]() {
return clock.getTimeNS();
});
- benchmark("uint64_t", pool, samples, numThreads, [&nsValue]() {
+ benchmark("uint64_t", samples, numThreads, [&nsValue]() {
return steady_time (duration(nsValue._value));
});
- benchmark("volatile uint64_t", pool, samples, numThreads, [&nsVolatile]() {
+ benchmark("volatile uint64_t", samples, numThreads, [&nsVolatile]() {
return steady_time(duration(nsVolatile._value));
});
- benchmark("memory_order_relaxed", pool, samples, numThreads, [&nsAtomic]() {
+ benchmark("memory_order_relaxed", samples, numThreads, [&nsAtomic]() {
return steady_time(duration(nsAtomic._value.load(std::memory_order_relaxed)));
});
- benchmark("memory_order_consume", pool, samples, numThreads, [&nsAtomic]() {
+ benchmark("memory_order_consume", samples, numThreads, [&nsAtomic]() {
return steady_time(duration(nsAtomic._value.load(std::memory_order_consume)));
});
- benchmark("memory_order_acquire", pool, samples, numThreads, [&nsAtomic]() {
+ benchmark("memory_order_acquire", samples, numThreads, [&nsAtomic]() {
return steady_time(duration(nsAtomic._value.load(std::memory_order_acquire)));
});
- benchmark("memory_order_seq_cst", pool, samples, numThreads, [&nsAtomic]() {
+ benchmark("memory_order_seq_cst", samples, numThreads, [&nsAtomic]() {
return steady_time(duration(nsAtomic._value.load(std::memory_order_seq_cst)));
});
-
- benchmark("vespalib::steady_time::now()", pool, samples, numThreads, []() {
+ benchmark("vespalib::steady_time::now()", samples, numThreads, []() {
return steady_clock::now();
});
-
- pool.Close();
return 0;
}
diff --git a/vespalib/src/vespa/vespalib/util/thread.h b/vespalib/src/vespa/vespalib/util/thread.h
index d919b8f2ab7..f331f1870c7 100644
--- a/vespalib/src/vespa/vespalib/util/thread.h
+++ b/vespalib/src/vespa/vespalib/util/thread.h
@@ -32,6 +32,7 @@ public:
_threads.reserve(_threads.size() + 1);
_threads.emplace_back(std::forward<F>(f), std::forward<Args>(args)...);
};
+ size_t size() const { return _threads.size(); }
void join() {
for (auto &thread: _threads) {
thread.join();
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
index 8b6427d9391..8af14366293 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
@@ -1,29 +1,9 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "threadstackexecutorbase.h"
-#include <vespa/fastos/thread.h>
namespace vespalib {
-namespace thread {
-
-struct ThreadInit : public FastOS_Runnable {
- Runnable &worker;
- ThreadStackExecutorBase::init_fun_t init_fun;
-
- explicit ThreadInit(Runnable &worker_in, ThreadStackExecutorBase::init_fun_t init_fun_in)
- : worker(worker_in), init_fun(std::move(init_fun_in)) {}
-
- void Run(FastOS_ThreadInterface *, void *) override;
-};
-
-void
-ThreadInit::Run(FastOS_ThreadInterface *, void *) {
- init_fun(worker);
-}
-
-}
-
ThreadStackExecutorBase::Worker::Worker()
: lock(),
cond(),
@@ -155,7 +135,7 @@ ThreadStackExecutorBase::run()
ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t taskLimit, init_fun_t init_fun)
: SyncableThreadExecutor(),
Runnable(),
- _pool(std::make_unique<FastOS_ThreadPool>()),
+ _pool(),
_lock(),
_cond(),
_stats(),
@@ -167,7 +147,7 @@ ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t taskLimit, init_fun_t
_taskCount(0),
_taskLimit(taskLimit),
_closed(false),
- _thread_init(std::make_unique<thread::ThreadInit>(*this, std::move(init_fun)))
+ _init_fun(init_fun)
{
assert(taskLimit > 0);
}
@@ -177,15 +157,13 @@ ThreadStackExecutorBase::start(uint32_t threads)
{
assert(threads > 0);
for (uint32_t i = 0; i < threads; ++i) {
- FastOS_ThreadInterface *thread = _pool->NewThread(_thread_init.get());
- assert(thread != nullptr);
- (void)thread;
+ _pool.start(*this, _init_fun);
}
}
size_t
ThreadStackExecutorBase::getNumThreads() const {
- return _pool->GetNumStartedThreads();
+ return _pool.size();
}
void
@@ -315,12 +293,11 @@ ThreadStackExecutorBase::cleanup()
{
shutdown().sync();
_executorCompletion.countDown();
- _pool->Close();
+ _pool.join();
}
ThreadStackExecutorBase::~ThreadStackExecutorBase()
{
- assert(_pool->isClosed());
assert(_taskCount == 0);
assert(_blocked.empty());
}
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
index 501fde92f4c..765499b73bc 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
@@ -2,21 +2,17 @@
#pragma once
+#include "thread.h"
#include "threadexecutor.h"
#include "eventbarrier.hpp"
#include "arrayqueue.hpp"
#include "gate.h"
-#include "runnable.h"
#include "executor_idle_tracking.h"
#include <vector>
#include <functional>
-class FastOS_ThreadPool;
-
namespace vespalib {
-namespace thread { struct ThreadInit; }
-
/**
* An executor service that executes tasks in multiple threads.
**/
@@ -73,7 +69,7 @@ private:
void unblock();
};
- std::unique_ptr<FastOS_ThreadPool> _pool;
+ ThreadPool _pool;
mutable std::mutex _lock;
std::condition_variable _cond;
ExecutorStats _stats;
@@ -86,7 +82,7 @@ private:
uint32_t _taskCount;
uint32_t _taskLimit;
bool _closed;
- std::unique_ptr<thread::ThreadInit> _thread_init;
+ init_fun_t _init_fun;
static thread_local ThreadStackExecutorBase *_master;
void block_thread(const unique_lock &, BlockedThread &blocked_thread);
@@ -225,4 +221,3 @@ public:
};
} // namespace vespalib
-