diff options
5 files changed, 35 insertions, 46 deletions
diff --git a/vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp index 14a235f7257..7fa2d357a5e 100644 --- a/vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp +++ b/vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp @@ -54,29 +54,20 @@ AdaptiveSequencedExecutor::Self::~Self() AdaptiveSequencedExecutor::ThreadTools::ThreadTools(AdaptiveSequencedExecutor &parent_in) : parent(parent_in), - pool(std::make_unique<FastOS_ThreadPool>()), + pool(), allow_worker_exit() { } AdaptiveSequencedExecutor::ThreadTools::~ThreadTools() { - assert(pool->isClosed()); -} - -void -AdaptiveSequencedExecutor::ThreadTools::Run(FastOS_ThreadInterface *, void *) -{ - parent.worker_main(); } void AdaptiveSequencedExecutor::ThreadTools::start(size_t num_threads) { for (size_t i = 0; i < num_threads; ++i) { - FastOS_ThreadInterface *thread = pool->NewThread(this); - assert(thread != nullptr); - (void)thread; + pool.start([this](){ parent.worker_main(); }); } } @@ -84,7 +75,7 @@ void AdaptiveSequencedExecutor::ThreadTools::close() { allow_worker_exit.countDown(); - pool->Close(); + pool.join(); } //----------------------------------------------------------------------------- diff --git a/vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h index fee9b8a61f8..ee12af77a61 100644 --- a/vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h +++ b/vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h @@ -3,11 +3,11 @@ #pragma once #include "isequencedtaskexecutor.h" +#include "thread.h" #include <vespa/vespalib/util/executor_idle_tracking.h> #include <vespa/vespalib/util/arrayqueue.hpp> #include <vespa/vespalib/util/gate.h> #include <vespa/vespalib/util/eventbarrier.hpp> -#include <vespa/fastos/thread.h> #include <mutex> #include <condition_variable> #include <optional> @@ -113,14 +113,12 @@ private: /** * Stuff related to worker thread startup and shutdown. **/ - struct ThreadTools : FastOS_Runnable { - static constexpr size_t STACK_SIZE = (256 * 1024); + struct ThreadTools { AdaptiveSequencedExecutor &parent; - std::unique_ptr<FastOS_ThreadPool> pool; + ThreadPool pool; Gate allow_worker_exit; ThreadTools(AdaptiveSequencedExecutor &parent_in); ~ThreadTools(); - void Run(FastOS_ThreadInterface *, void *) override; void start(size_t num_threads); void close(); }; diff --git a/vespalib/src/vespa/vespalib/util/shutdownguard.cpp b/vespalib/src/vespa/vespalib/util/shutdownguard.cpp index e3e56dc78cb..3d7404028a1 100644 --- a/vespalib/src/vespa/vespalib/util/shutdownguard.cpp +++ b/vespalib/src/vespa/vespalib/util/shutdownguard.cpp @@ -1,19 +1,16 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "shutdownguard.h" #include <unistd.h> -#include <thread> #include <vespa/log/log.h> LOG_SETUP(".vespalib.shutdownguard"); namespace vespalib { -namespace { -enum { STACK_SIZE = (1u << 16) }; -} -void ShutdownGuard::Run(FastOS_ThreadInterface *, void *) +void +ShutdownGuard::run() { - while (_dieAtTime > steady_clock::now() && ! GetThread()->GetBreakFlag()) { + while (_dieAtTime > steady_clock::now() && !_cancel.load(std::memory_order_relaxed)) { std::this_thread::sleep_for(5ms); } if (_dieAtTime <= steady_clock::now()) { @@ -22,19 +19,17 @@ void ShutdownGuard::Run(FastOS_ThreadInterface *, void *) } } -ShutdownGuard::ShutdownGuard(duration millis) : - FastOS_Runnable(), - _pool(1), +ShutdownGuard::ShutdownGuard(duration millis) + : _thread(), _dieAtTime(steady_clock::now() + millis) { - _pool.NewThread(this); + _thread = std::thread(&ShutdownGuard::run, this); } ShutdownGuard::~ShutdownGuard() { - GetThread()->SetBreakFlag(); - GetThread()->Join(); - _pool.Close(); + _cancel = true; + _thread.join(); } } diff --git a/vespalib/src/vespa/vespalib/util/shutdownguard.h b/vespalib/src/vespa/vespalib/util/shutdownguard.h index d76d4deb5d2..fb83c2f5977 100644 --- a/vespalib/src/vespa/vespalib/util/shutdownguard.h +++ b/vespalib/src/vespa/vespalib/util/shutdownguard.h @@ -2,7 +2,8 @@ #pragma once #include <vespa/vespalib/util/time.h> -#include <vespa/fastos/thread.h> +#include <thread> +#include <atomic> namespace vespalib { @@ -13,12 +14,13 @@ namespace vespalib { * termination. * A separate shutdown thread will perform the actual _exit() call. **/ -class ShutdownGuard : public FastOS_Runnable +class ShutdownGuard { - FastOS_ThreadPool _pool; - steady_time _dieAtTime; + std::thread _thread; + steady_time _dieAtTime; + std::atomic<bool> _cancel; - void Run(FastOS_ThreadInterface *, void *) override; + void run(); public: /** diff --git a/vespalog/src/test/threads/testthreads.cpp b/vespalog/src/test/threads/testthreads.cpp index ed2683b5c35..0c9b3a1cdb2 100644 --- a/vespalog/src/test/threads/testthreads.cpp +++ b/vespalog/src/test/threads/testthreads.cpp @@ -1,5 +1,4 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/fastos/thread.h> #include <vespa/log/bufferedlogger.h> #include <array> #include <iostream> @@ -10,6 +9,7 @@ #include <unistd.h> #include <sys/stat.h> #include <cstdlib> +#include <vector> using std::string; using namespace std::chrono_literals; @@ -17,28 +17,28 @@ using namespace std::chrono; LOG_SETUP(".threadtest"); -class FileThread : public FastOS_Runnable +class FileThread { std::atomic<bool> _done; string _file; public: FileThread(string file) : _done(false), _file(file) {} - void Run(FastOS_ThreadInterface *thread, void *arg) override; + void entry(); void stop() { _done.store(true, std::memory_order_relaxed); } }; -class LoggerThread : public FastOS_Runnable +class LoggerThread { std::atomic<bool> _done; public: std::atomic<bool> _useLogBuffer; LoggerThread() : _done(false), _useLogBuffer(false) {} - void Run(FastOS_ThreadInterface *thread, void *arg) override; + void entry(); void stop() { _done.store(true, std::memory_order_relaxed); } }; void -FileThread::Run(FastOS_ThreadInterface *, void *) +FileThread::entry() { unlink(_file.c_str()); while (!_done.load(std::memory_order_relaxed)) { @@ -63,7 +63,7 @@ FileThread::Run(FastOS_ThreadInterface *, void *) void -LoggerThread::Run(FastOS_ThreadInterface *, void *) +LoggerThread::entry() { int counter = 0; while (!_done.load(std::memory_order_relaxed)) { @@ -89,11 +89,12 @@ public: } }; + int ThreadTester::Main() { std::cerr << "Testing that logging is threadsafe. 5 sec test.\n"; - FastOS_ThreadPool pool; + std::vector<std::thread> threads; const int numWriters = 30; const int numLoggers = 10; @@ -105,11 +106,11 @@ ThreadTester::Main() char filename[100]; snprintf(filename, sizeof(filename), "empty.%d", i); writers[i] = std::make_unique<FileThread>(filename); - pool.NewThread(writers[i].get()); + threads.emplace_back([obj = writers[i].get()](){ obj->entry(); }); } for (int i = 0; i < numLoggers; i++) { loggers[i] = std::make_unique<LoggerThread>(); - pool.NewThread(loggers[i].get()); + threads.emplace_back([obj = loggers[i].get()](){ obj->entry(); }); } steady_clock::time_point start = steady_clock::now(); @@ -136,7 +137,9 @@ ThreadTester::Main() writers[i]->stop(); } - pool.Close(); + for (auto &thread: threads) { + thread.join(); + } return 0; } |