aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-02-17 17:20:35 +0100
committerGitHub <noreply@github.com>2023-02-17 17:20:35 +0100
commit805e24544b2b43abe28b77829de30930b3eea321 (patch)
treea05e1fe4bcaa6c9b2aa7f8550a0725991a70b3e6
parent083f5276f8b40c51e523b2a5ac00d5a1f40515b7 (diff)
parent3ea4d22e714092d3600c268feb8abf4d4c5c7fd6 (diff)
Merge pull request #26093 from vespa-engine/havardpe/use-less-fastos-thread
use fastos thread less
-rw-r--r--vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp15
-rw-r--r--vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h8
-rw-r--r--vespalib/src/vespa/vespalib/util/shutdownguard.cpp21
-rw-r--r--vespalib/src/vespa/vespalib/util/shutdownguard.h12
-rw-r--r--vespalog/src/test/threads/testthreads.cpp25
5 files changed, 35 insertions, 46 deletions
diff --git a/vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
index 14a235f7257..7fa2d357a5e 100644
--- a/vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
+++ b/vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
@@ -54,29 +54,20 @@ AdaptiveSequencedExecutor::Self::~Self()
AdaptiveSequencedExecutor::ThreadTools::ThreadTools(AdaptiveSequencedExecutor &parent_in)
: parent(parent_in),
- pool(std::make_unique<FastOS_ThreadPool>()),
+ pool(),
allow_worker_exit()
{
}
AdaptiveSequencedExecutor::ThreadTools::~ThreadTools()
{
- assert(pool->isClosed());
-}
-
-void
-AdaptiveSequencedExecutor::ThreadTools::Run(FastOS_ThreadInterface *, void *)
-{
- parent.worker_main();
}
void
AdaptiveSequencedExecutor::ThreadTools::start(size_t num_threads)
{
for (size_t i = 0; i < num_threads; ++i) {
- FastOS_ThreadInterface *thread = pool->NewThread(this);
- assert(thread != nullptr);
- (void)thread;
+ pool.start([this](){ parent.worker_main(); });
}
}
@@ -84,7 +75,7 @@ void
AdaptiveSequencedExecutor::ThreadTools::close()
{
allow_worker_exit.countDown();
- pool->Close();
+ pool.join();
}
//-----------------------------------------------------------------------------
diff --git a/vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
index fee9b8a61f8..ee12af77a61 100644
--- a/vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
+++ b/vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
@@ -3,11 +3,11 @@
#pragma once
#include "isequencedtaskexecutor.h"
+#include "thread.h"
#include <vespa/vespalib/util/executor_idle_tracking.h>
#include <vespa/vespalib/util/arrayqueue.hpp>
#include <vespa/vespalib/util/gate.h>
#include <vespa/vespalib/util/eventbarrier.hpp>
-#include <vespa/fastos/thread.h>
#include <mutex>
#include <condition_variable>
#include <optional>
@@ -113,14 +113,12 @@ private:
/**
* Stuff related to worker thread startup and shutdown.
**/
- struct ThreadTools : FastOS_Runnable {
- static constexpr size_t STACK_SIZE = (256 * 1024);
+ struct ThreadTools {
AdaptiveSequencedExecutor &parent;
- std::unique_ptr<FastOS_ThreadPool> pool;
+ ThreadPool pool;
Gate allow_worker_exit;
ThreadTools(AdaptiveSequencedExecutor &parent_in);
~ThreadTools();
- void Run(FastOS_ThreadInterface *, void *) override;
void start(size_t num_threads);
void close();
};
diff --git a/vespalib/src/vespa/vespalib/util/shutdownguard.cpp b/vespalib/src/vespa/vespalib/util/shutdownguard.cpp
index e3e56dc78cb..3d7404028a1 100644
--- a/vespalib/src/vespa/vespalib/util/shutdownguard.cpp
+++ b/vespalib/src/vespa/vespalib/util/shutdownguard.cpp
@@ -1,19 +1,16 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "shutdownguard.h"
#include <unistd.h>
-#include <thread>
#include <vespa/log/log.h>
LOG_SETUP(".vespalib.shutdownguard");
namespace vespalib {
-namespace {
-enum { STACK_SIZE = (1u << 16) };
-}
-void ShutdownGuard::Run(FastOS_ThreadInterface *, void *)
+void
+ShutdownGuard::run()
{
- while (_dieAtTime > steady_clock::now() && ! GetThread()->GetBreakFlag()) {
+ while (_dieAtTime > steady_clock::now() && !_cancel.load(std::memory_order_relaxed)) {
std::this_thread::sleep_for(5ms);
}
if (_dieAtTime <= steady_clock::now()) {
@@ -22,19 +19,17 @@ void ShutdownGuard::Run(FastOS_ThreadInterface *, void *)
}
}
-ShutdownGuard::ShutdownGuard(duration millis) :
- FastOS_Runnable(),
- _pool(1),
+ShutdownGuard::ShutdownGuard(duration millis)
+ : _thread(),
_dieAtTime(steady_clock::now() + millis)
{
- _pool.NewThread(this);
+ _thread = std::thread(&ShutdownGuard::run, this);
}
ShutdownGuard::~ShutdownGuard()
{
- GetThread()->SetBreakFlag();
- GetThread()->Join();
- _pool.Close();
+ _cancel = true;
+ _thread.join();
}
}
diff --git a/vespalib/src/vespa/vespalib/util/shutdownguard.h b/vespalib/src/vespa/vespalib/util/shutdownguard.h
index d76d4deb5d2..fb83c2f5977 100644
--- a/vespalib/src/vespa/vespalib/util/shutdownguard.h
+++ b/vespalib/src/vespa/vespalib/util/shutdownguard.h
@@ -2,7 +2,8 @@
#pragma once
#include <vespa/vespalib/util/time.h>
-#include <vespa/fastos/thread.h>
+#include <thread>
+#include <atomic>
namespace vespalib {
@@ -13,12 +14,13 @@ namespace vespalib {
* termination.
* A separate shutdown thread will perform the actual _exit() call.
**/
-class ShutdownGuard : public FastOS_Runnable
+class ShutdownGuard
{
- FastOS_ThreadPool _pool;
- steady_time _dieAtTime;
+ std::thread _thread;
+ steady_time _dieAtTime;
+ std::atomic<bool> _cancel;
- void Run(FastOS_ThreadInterface *, void *) override;
+ void run();
public:
/**
diff --git a/vespalog/src/test/threads/testthreads.cpp b/vespalog/src/test/threads/testthreads.cpp
index ed2683b5c35..0c9b3a1cdb2 100644
--- a/vespalog/src/test/threads/testthreads.cpp
+++ b/vespalog/src/test/threads/testthreads.cpp
@@ -1,5 +1,4 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/fastos/thread.h>
#include <vespa/log/bufferedlogger.h>
#include <array>
#include <iostream>
@@ -10,6 +9,7 @@
#include <unistd.h>
#include <sys/stat.h>
#include <cstdlib>
+#include <vector>
using std::string;
using namespace std::chrono_literals;
@@ -17,28 +17,28 @@ using namespace std::chrono;
LOG_SETUP(".threadtest");
-class FileThread : public FastOS_Runnable
+class FileThread
{
std::atomic<bool> _done;
string _file;
public:
FileThread(string file) : _done(false), _file(file) {}
- void Run(FastOS_ThreadInterface *thread, void *arg) override;
+ void entry();
void stop() { _done.store(true, std::memory_order_relaxed); }
};
-class LoggerThread : public FastOS_Runnable
+class LoggerThread
{
std::atomic<bool> _done;
public:
std::atomic<bool> _useLogBuffer;
LoggerThread() : _done(false), _useLogBuffer(false) {}
- void Run(FastOS_ThreadInterface *thread, void *arg) override;
+ void entry();
void stop() { _done.store(true, std::memory_order_relaxed); }
};
void
-FileThread::Run(FastOS_ThreadInterface *, void *)
+FileThread::entry()
{
unlink(_file.c_str());
while (!_done.load(std::memory_order_relaxed)) {
@@ -63,7 +63,7 @@ FileThread::Run(FastOS_ThreadInterface *, void *)
void
-LoggerThread::Run(FastOS_ThreadInterface *, void *)
+LoggerThread::entry()
{
int counter = 0;
while (!_done.load(std::memory_order_relaxed)) {
@@ -89,11 +89,12 @@ public:
}
};
+
int
ThreadTester::Main()
{
std::cerr << "Testing that logging is threadsafe. 5 sec test.\n";
- FastOS_ThreadPool pool;
+ std::vector<std::thread> threads;
const int numWriters = 30;
const int numLoggers = 10;
@@ -105,11 +106,11 @@ ThreadTester::Main()
char filename[100];
snprintf(filename, sizeof(filename), "empty.%d", i);
writers[i] = std::make_unique<FileThread>(filename);
- pool.NewThread(writers[i].get());
+ threads.emplace_back([obj = writers[i].get()](){ obj->entry(); });
}
for (int i = 0; i < numLoggers; i++) {
loggers[i] = std::make_unique<LoggerThread>();
- pool.NewThread(loggers[i].get());
+ threads.emplace_back([obj = loggers[i].get()](){ obj->entry(); });
}
steady_clock::time_point start = steady_clock::now();
@@ -136,7 +137,9 @@ ThreadTester::Main()
writers[i]->stop();
}
- pool.Close();
+ for (auto &thread: threads) {
+ thread.join();
+ }
return 0;
}