From 6a5818c4f6a700b56c0d30ac2d36d32d2e2a3bc0 Mon Sep 17 00:00:00 2001 From: HÃ¥vard Pettersen Date: Thu, 16 Feb 2023 10:26:19 +0000 Subject: remove FastOS_Thread (and FastOS_ThreadPool/FastOS_Runnable) --- fastos/src/tests/CMakeLists.txt | 21 -- fastos/src/tests/job.h | 51 ---- fastos/src/tests/tests.h | 10 +- fastos/src/tests/thread_joinwait_test.cpp | 118 -------- fastos/src/tests/thread_stats_test.cpp | 129 --------- fastos/src/tests/thread_test_base.hpp | 161 ---------- fastos/src/tests/threadtest.cpp | 283 ------------------ fastos/src/tests/typetest.cpp | 3 - fastos/src/vespa/fastos/CMakeLists.txt | 2 - fastos/src/vespa/fastos/thread.cpp | 363 ----------------------- fastos/src/vespa/fastos/thread.h | 467 ------------------------------ fastos/src/vespa/fastos/unix_thread.cpp | 37 --- fastos/src/vespa/fastos/unix_thread.h | 37 --- 13 files changed, 2 insertions(+), 1680 deletions(-) delete mode 100644 fastos/src/tests/job.h delete mode 100644 fastos/src/tests/thread_joinwait_test.cpp delete mode 100644 fastos/src/tests/thread_stats_test.cpp delete mode 100644 fastos/src/tests/thread_test_base.hpp delete mode 100644 fastos/src/tests/threadtest.cpp delete mode 100644 fastos/src/vespa/fastos/thread.cpp delete mode 100644 fastos/src/vespa/fastos/thread.h delete mode 100644 fastos/src/vespa/fastos/unix_thread.cpp delete mode 100644 fastos/src/vespa/fastos/unix_thread.h diff --git a/fastos/src/tests/CMakeLists.txt b/fastos/src/tests/CMakeLists.txt index 3bf68a88e79..7d4b014b6b3 100644 --- a/fastos/src/tests/CMakeLists.txt +++ b/fastos/src/tests/CMakeLists.txt @@ -6,27 +6,6 @@ vespa_add_executable(fastos_filetest_app TEST fastos ) vespa_add_test(NAME fastos_filetest_app NO_VALGRIND COMMAND fastos_filetest_app) -vespa_add_executable(fastos_thread_stats_test_app TEST - SOURCES - thread_stats_test.cpp - DEPENDS - fastos -) -vespa_add_test(NAME fastos_thread_stats_test_app NO_VALGRIND COMMAND fastos_thread_stats_test_app) -vespa_add_executable(fastos_thread_joinwait_test_app TEST - SOURCES - thread_joinwait_test.cpp - DEPENDS - fastos -) -vespa_add_test(NAME fastos_thread_joinwait_test_app NO_VALGRIND COMMAND fastos_thread_joinwait_test_app) -vespa_add_executable(fastos_threadtest_app TEST - SOURCES - threadtest.cpp - DEPENDS - fastos -) -vespa_add_test(NAME fastos_threadtest_app NO_VALGRIND COMMAND fastos_threadtest_app) vespa_add_executable(fastos_typetest_app TEST SOURCES typetest.cpp diff --git a/fastos/src/tests/job.h b/fastos/src/tests/job.h deleted file mode 100644 index 4546cfe1daa..00000000000 --- a/fastos/src/tests/job.h +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include -#include - -enum JobCode -{ - PRINT_MESSAGE_AND_WAIT3MSEC, - INCREASE_NUMBER, - WAIT_FOR_BREAK_FLAG, - WAIT_FOR_THREAD_TO_FINISH, - TEST_ID, - SILENTNOP, - NOP -}; - -class Job -{ -private: - Job(const Job &); - Job &operator=(const Job&); - -public: - JobCode code; - char *message; - std::mutex *mutex; - std::condition_variable *condition; - FastOS_ThreadInterface *otherThread, *ownThread; - std::atomic result; - FastOS_ThreadId _threadId; - - Job() - : code(NOP), - message(nullptr), - mutex(nullptr), - condition(nullptr), - otherThread(nullptr), - ownThread(nullptr), - result(-1), - _threadId() - { - } - - ~Job() - { - if(message != nullptr) - free(message); - } -}; diff --git a/fastos/src/tests/tests.h b/fastos/src/tests/tests.h index 3a6f2ef9010..9cd7a10ab48 100644 --- a/fastos/src/tests/tests.h +++ b/fastos/src/tests/tests.h @@ -1,8 +1,9 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include #include #include +#include +#include class BaseTest { @@ -83,13 +84,6 @@ public: return Progress(result, string); } - bool Progress (bool result, const char *str, const FastOS_ThreadInterface *s1) - { - char string[MAX_STR_LEN-100]; - snprintf(string, sizeof(string), str, s1); - return Progress(result, string); - } - bool Progress (bool result, const char *str, const char *s1, const char *s2) { char string[MAX_STR_LEN-100]; diff --git a/fastos/src/tests/thread_joinwait_test.cpp b/fastos/src/tests/thread_joinwait_test.cpp deleted file mode 100644 index 6c7e8a7dc3c..00000000000 --- a/fastos/src/tests/thread_joinwait_test.cpp +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "tests.h" -#include "job.h" -#include "thread_test_base.hpp" - -class Thread_JoinWait_Test : public ThreadTestBase -{ - int Main () override; - - void SingleThreadJoinWaitMultipleTest(int variant) - { - bool rc=false; - - char testName[300]; - - snprintf(testName, sizeof(testName), "Single Thread Join Wait Multiple Test %d", variant); - TestHeader(testName); - - FastOS_ThreadPool pool; - - const int testThreads=5; - int lastThreadNum = testThreads-1; - int i; - - Job jobs[testThreads]; - - std::mutex jobMutex; - - // The mutex is used to pause the first threads until we have created - // the last one. - jobMutex.lock(); - - for(i=0; i(&jobs[i])); - - rc = (jobs[i].ownThread != nullptr); - Progress(rc, "Creating Thread %d", i+1); - - if(!rc) - break; - } - - if (rc) - { - jobs[lastThreadNum].code = (((variant & 2) != 0) ? NOP : PRINT_MESSAGE_AND_WAIT3MSEC); - jobs[lastThreadNum].message = strdup("This is the thread that others wait for."); - - FastOS_ThreadInterface *lastThread; - - lastThread = pool.NewThread(this, - static_cast - (&jobs[lastThreadNum])); - - rc = (lastThread != nullptr); - Progress(rc, "Creating last thread"); - - if (rc) - { - for(i=0; iJoin(); - Progress(true, "Thread %d finished.", i+1); - } - } - - Progress(true, "Closing pool."); - pool.Close(); - Progress(true, "Pool closed."); - PrintSeparator(); - } - -}; - -int Thread_JoinWait_Test::Main () -{ - printf("grep for the string '%s' to detect failures.\n\n", failString); - time_t before = time(0); - - SingleThreadJoinWaitMultipleTest(0); - { time_t now = time(0); printf("[%ld seconds]\n", now-before); before = now; } - SingleThreadJoinWaitMultipleTest(1); - { time_t now = time(0); printf("[%ld seconds]\n", now-before); before = now; } - SingleThreadJoinWaitMultipleTest(2); - { time_t now = time(0); printf("[%ld seconds]\n", now-before); before = now; } - SingleThreadJoinWaitMultipleTest(3); - { time_t now = time(0); printf("[%ld seconds]\n", now-before); before = now; } - SingleThreadJoinWaitMultipleTest(2); - { time_t now = time(0); printf("[%ld seconds]\n", now-before); before = now; } - SingleThreadJoinWaitMultipleTest(1); - { time_t now = time(0); printf("[%ld seconds]\n", now-before); before = now; } - SingleThreadJoinWaitMultipleTest(0); - { time_t now = time(0); printf("[%ld seconds]\n", now-before); before = now; } - - printf("END OF TEST (%s)\n", _argv[0]); - return allWasOk() ? 0 : 1; -} - -int main (int argc, char **argv) -{ - Thread_JoinWait_Test app; - setvbuf(stdout, nullptr, _IOLBF, 8192); - return app.Entry(argc, argv); -} diff --git a/fastos/src/tests/thread_stats_test.cpp b/fastos/src/tests/thread_stats_test.cpp deleted file mode 100644 index 40c1199135c..00000000000 --- a/fastos/src/tests/thread_stats_test.cpp +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "tests.h" -#include "job.h" -#include "thread_test_base.hpp" - -class Thread_Stats_Test : public ThreadTestBase -{ - void ThreadStatsTest () - { - int inactiveThreads; - int activeThreads; - int startedThreads; - - TestHeader("Thread Statistics Test"); - - FastOS_ThreadPool pool; - Job job[2]; - - inactiveThreads = pool.GetNumInactiveThreads(); - Progress(inactiveThreads == 0, "Initial inactive threads = %d", inactiveThreads); - activeThreads = pool.GetNumActiveThreads(); - Progress(activeThreads == 0, "Initial active threads = %d", activeThreads); - startedThreads = pool.GetNumStartedThreads(); - Progress(startedThreads == 0, "Initial started threads = %d", startedThreads); - - job[0].code = WAIT_FOR_BREAK_FLAG; - job[0].ownThread = pool.NewThread(this, static_cast(&job[0])); - - inactiveThreads = pool.GetNumInactiveThreads(); - Progress(inactiveThreads == 0, "Inactive threads = %d", inactiveThreads); - activeThreads = pool.GetNumActiveThreads(); - Progress(activeThreads == 1, "Active threads = %d", activeThreads); - startedThreads = pool.GetNumStartedThreads(); - Progress(startedThreads == 1, "Started threads = %d", startedThreads); - - job[1].code = WAIT_FOR_BREAK_FLAG; - job[1].ownThread = pool.NewThread(this, static_cast(&job[1])); - - inactiveThreads = pool.GetNumInactiveThreads(); - Progress(inactiveThreads == 0, "Inactive threads = %d", inactiveThreads); - activeThreads = pool.GetNumActiveThreads(); - Progress(activeThreads == 2, "Active threads = %d", activeThreads); - startedThreads = pool.GetNumStartedThreads(); - Progress(startedThreads == 2, "Started threads = %d", startedThreads); - - Progress(true, "Setting breakflag on threads..."); - job[0].ownThread->SetBreakFlag(); - job[1].ownThread->SetBreakFlag(); - - job[0].ownThread->Join(); - job[1].ownThread->Join(); - while (pool.GetNumInactiveThreads() != 2) { - std::this_thread::sleep_for(1ms); - } - - inactiveThreads = pool.GetNumInactiveThreads(); - Progress(inactiveThreads == 2, "Inactive threads = %d", inactiveThreads); - activeThreads = pool.GetNumActiveThreads(); - Progress(activeThreads == 0, "Active threads = %d", activeThreads); - startedThreads = pool.GetNumStartedThreads(); - Progress(startedThreads == 2, "Started threads = %d", startedThreads); - - Progress(true, "Repeating process in the same pool..."); - - job[0].code = WAIT_FOR_BREAK_FLAG; - job[0].ownThread = pool.NewThread(this, static_cast(&job[0])); - - inactiveThreads = pool.GetNumInactiveThreads(); - Progress(inactiveThreads == 1, "Inactive threads = %d", inactiveThreads); - activeThreads = pool.GetNumActiveThreads(); - Progress(activeThreads == 1, "Active threads = %d", activeThreads); - startedThreads = pool.GetNumStartedThreads(); - Progress(startedThreads == 3, "Started threads = %d", startedThreads); - - job[1].code = WAIT_FOR_BREAK_FLAG; - job[1].ownThread = pool.NewThread(this, static_cast(&job[1])); - - inactiveThreads = pool.GetNumInactiveThreads(); - Progress(inactiveThreads == 0, "Inactive threads = %d", inactiveThreads); - activeThreads = pool.GetNumActiveThreads(); - Progress(activeThreads == 2, "Active threads = %d", activeThreads); - startedThreads = pool.GetNumStartedThreads(); - Progress(startedThreads == 4, "Started threads = %d", startedThreads); - - Progress(true, "Setting breakflag on threads..."); - job[0].ownThread->SetBreakFlag(); - job[1].ownThread->SetBreakFlag(); - - job[0].ownThread->Join(); - job[1].ownThread->Join(); - while (pool.GetNumInactiveThreads() != 2) { - std::this_thread::sleep_for(1ms); - } - - inactiveThreads = pool.GetNumInactiveThreads(); - Progress(inactiveThreads == 2, "Inactive threads = %d", inactiveThreads); - activeThreads = pool.GetNumActiveThreads(); - Progress(activeThreads == 0, "Active threads = %d", activeThreads); - startedThreads = pool.GetNumStartedThreads(); - Progress(startedThreads == 4, "Started threads = %d", startedThreads); - - pool.Close(); - Progress(true, "Pool closed."); - - PrintSeparator(); - } - - int Main () override; -}; - -int Thread_Stats_Test::Main () -{ - printf("grep for the string '%s' to detect failures.\n\n", failString); - time_t before = time(0); - - ThreadStatsTest(); - { time_t now = time(0); printf("[%ld seconds]\n", now-before); before = now; } - - printf("END OF TEST (%s)\n", _argv[0]); - return allWasOk() ? 0 : 1; -} - -int main (int argc, char **argv) -{ - Thread_Stats_Test app; - setvbuf(stdout, nullptr, _IOLBF, 8192); - return app.Entry(argc, argv); -} diff --git a/fastos/src/tests/thread_test_base.hpp b/fastos/src/tests/thread_test_base.hpp deleted file mode 100644 index eb994537f6e..00000000000 --- a/fastos/src/tests/thread_test_base.hpp +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include -#include - -static std::atomic number; -#define INCREASE_NUMBER_AMOUNT 10000 - -using namespace std::chrono_literals; - -class ThreadTestBase : public BaseTest, public FastOS_Runnable -{ -private: - std::mutex printMutex; - -public: - ThreadTestBase(void) - : printMutex() - { - } - virtual ~ThreadTestBase() {} - - void PrintProgress (char *string) override { - std::lock_guard guard(printMutex); - BaseTest::PrintProgress(string); - } - - void Run (FastOS_ThreadInterface *thread, void *arg) override; - - void WaitForThreadsToFinish (Job *jobs, int count) { - int i; - - Progress(true, "Waiting for threads to finish..."); - for(;;) { - bool threadsFinished=true; - - for (i=0; i(arg); - char someStack[15*1024]; - - memset(someStack, 0, 15*1024); - - switch(job->code) - { - case SILENTNOP: - { - job->result = 1; - break; - } - - case NOP: - { - Progress(true, "Doing NOP"); - job->result = 1; - break; - } - - case PRINT_MESSAGE_AND_WAIT3MSEC: - { - Progress(true, "Thread printing message: [%s]", job->message); - job->result = strlen(job->message); - - std::this_thread::sleep_for(3ms); - break; - } - - case INCREASE_NUMBER: - { - int result; - - std::unique_lock guard; - if(job->mutex != nullptr) { - guard = std::unique_lock(*job->mutex); - } - - result = static_cast(number.load(std::memory_order_relaxed)); - - int sleepOn = (INCREASE_NUMBER_AMOUNT/2) * 321/10000; - for (int i=0; i<(INCREASE_NUMBER_AMOUNT/2); i++) { - number.fetch_add(2, std::memory_order_relaxed); - - if (i == sleepOn) - std::this_thread::sleep_for(1ms); - } - - guard = std::unique_lock(); - - job->result = result; // This marks the end of the thread - - break; - } - - case WAIT_FOR_BREAK_FLAG: - { - for(;;) { - std::this_thread::sleep_for(1us); - - if (thread->GetBreakFlag()) { - Progress(true, "Thread %p got breakflag", thread); - break; - } - } - break; - } - - case WAIT_FOR_THREAD_TO_FINISH: - { - std::unique_lock guard; - if (job->mutex != nullptr) { - guard = std::unique_lock(*job->mutex); - } - - if (job->otherThread != nullptr) - job->otherThread->Join(); - - break; - } - - case TEST_ID: - { - job->mutex->lock(); // Initially the parent threads owns the lock - job->mutex->unlock(); // It is unlocked when we should start - - FastOS_ThreadId currentId = FastOS_Thread::GetCurrentThreadId(); - - if(currentId == job->_threadId) - job->result = 1; - else - job->result = -1; - break; - } - - default: - Progress(false, "Unknown jobcode"); - break; - } -} diff --git a/fastos/src/tests/threadtest.cpp b/fastos/src/tests/threadtest.cpp deleted file mode 100644 index 563b41ac229..00000000000 --- a/fastos/src/tests/threadtest.cpp +++ /dev/null @@ -1,283 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "tests.h" -#include "job.h" -#include "thread_test_base.hpp" -#include -#include - -#define MAX_THREADS 7 - -using namespace std::chrono; -using namespace std::chrono_literals; - -class ThreadTest : public ThreadTestBase -{ - int Main () override; - - void TooManyThreadsTest () - { - TestHeader("Too Many Threads Test"); - static constexpr size_t message_size = 100; - - FastOS_ThreadPool *pool = new FastOS_ThreadPool(MAX_THREADS); - - if (Progress(pool != nullptr, "Allocating ThreadPool")) { - int i; - Job jobs[MAX_THREADS+1]; - - for (i=0; i(malloc(message_size)); - if (jobs[i].message == nullptr) { - abort(); // GCC may infer that a potentially null ptr is passed to sprintf - } - snprintf(jobs[i].message, message_size, "Thread %d invocation", i+1); - } - - for (i=0; iGetNumInactiveThreads() > 0); - jobs[MAX_THREADS].code = PRINT_MESSAGE_AND_WAIT3MSEC; - bool rc = (nullptr == pool->NewThread(this, static_cast(&jobs[MAX_THREADS]))); - Progress(rc, "Creating too many threads should fail."); - } else { - jobs[i].ownThread = pool->NewThread(this, static_cast(&jobs[i])); - Progress(jobs[i].ownThread != nullptr, "Creating Thread"); - } - } - for (i=0; iSetBreakFlag(); - } - - Progress(true, "Closing threadpool..."); - pool->Close(); - - Progress(true, "Deleting threadpool..."); - delete(pool); - } - PrintSeparator(); - } - - void CreateSingleThreadAndJoin () - { - TestHeader("Create Single Thread And Join Test"); - - FastOS_ThreadPool *pool = new FastOS_ThreadPool; - - if (Progress(pool != nullptr, "Allocating ThreadPool")) { - Job job; - - job.code = NOP; - job.result = -1; - - bool rc = (nullptr != pool->NewThread(this, &job)); - Progress(rc, "Creating Thread"); - - WaitForThreadsToFinish(&job, 1); - } - - Progress(true, "Closing threadpool..."); - pool->Close(); - - Progress(true, "Deleting threadpool..."); - delete(pool); - PrintSeparator(); - } - - void ThreadCreatePerformance (bool silent, int count, int outercount) { - int i; - int j; - bool rc; - int threadsok; - - if (!silent) - TestHeader("Thread Create Performance"); - - FastOS_ThreadPool *pool = new FastOS_ThreadPool; - - if (!silent) - Progress(pool != nullptr, "Allocating ThreadPool"); - if (pool != nullptr) { - Job *jobs = new Job[count]; - - threadsok = 0; - steady_clock::time_point start = steady_clock::now(); - for (i = 0; i < count; i++) { - jobs[i].code = SILENTNOP; - jobs[i].ownThread = pool->NewThread(this, &jobs[i]); - rc = (jobs[i].ownThread != nullptr); - if (rc) - threadsok++; - } - - for (j = 0; j < outercount; j++) { - for (i = 0; i < count; i++) { - if (jobs[i].ownThread != nullptr) - jobs[i].ownThread->Join(); - jobs[i].ownThread = pool->NewThread(this, &jobs[i]); - rc = (jobs[i].ownThread != nullptr); - if (rc) - threadsok++; - } - } - for (i = 0; i < count; i++) { - if (jobs[i].ownThread != nullptr) - jobs[i].ownThread->Join(); - } - nanoseconds used = steady_clock::now() - start; - - if (!silent) { - double timeused = used.count() / 1000000000.0; - - Progress(true, "Used time: %2.3f", timeused); - ProgressFloat(true, "Threads/s: %6.1f", - static_cast(static_cast(threadsok) / timeused)); - } - if (threadsok != ((outercount + 1) * count)) - Progress(false, "Only started %d of %d threads", threadsok, - (outercount + 1) * count); - - if (!silent) - Progress(true, "Closing threadpool..."); - pool->Close(); - delete [] jobs; - - if (!silent) - Progress(true, "Deleting threadpool..."); - delete(pool); - if (!silent) - PrintSeparator(); - } - } - - void ClosePoolStability(void) { - int i; - TestHeader("ThreadPool close stability test"); - for (i = 0; i < 1000; i++) { - // Progress(true, "Creating pool iteration %d", i + 1); - ThreadCreatePerformance(true, 2, 1); - } - PrintSeparator(); - } - - - - void ClosePoolTest () - { - TestHeader("Close Pool Test"); - - FastOS_ThreadPool pool; - const int closePoolThreads=9; - Job jobs[closePoolThreads]; - - number = 0; - - for(int i=0; i(&jobs[i]))); - Progress(rc, "Creating Thread %d", i+1); - } - - Progress(true, "Waiting for threads to finish using pool.Close()..."); - pool.Close(); - Progress(true, "Pool closed."); - PrintSeparator(); - } - - void BreakFlagTest () { - TestHeader("BreakFlag Test"); - - FastOS_ThreadPool pool; - - const int breakFlagThreads=4; - - Job jobs[breakFlagThreads]; - - for (int i=0; i(&jobs[i]))); - Progress(rc, "Creating Thread %d", i+1); - } - - Progress(true, "Waiting for threads to finish using pool.Close()..."); - pool.Close(); - Progress(true, "Pool closed."); - PrintSeparator(); - } - - void ThreadIdTest () { - constexpr int numThreads = 5; - - TestHeader ("Thread Id Test"); - - FastOS_ThreadPool pool; - Job jobs[numThreads]; - std::mutex slowStartMutex; - - slowStartMutex.lock(); // Halt all threads until we want them to run - - for (int i=0; i(&jobs[i])); - bool rc=(jobs[i].ownThread != nullptr); - if (rc) { - jobs[i]._threadId = jobs[i].ownThread->GetThreadId(); - } - Progress(rc, "CreatingThread %d id:%lu", i+1, (unsigned long)(jobs[i]._threadId)); - - for (int j=0; j -#include - -// ---------------------------------------------------------------------- -// FastOS_ThreadPool -// ---------------------------------------------------------------------- - -FastOS_ThreadPool::FastOS_ThreadPool() : FastOS_ThreadPool(0) {} - -FastOS_ThreadPool::FastOS_ThreadPool(int maxThreads) - : _startedThreadsCount(0), - _closeFlagMutex(), - _closeCalledFlag(false), - _freeMutex(), - _liveMutex(), - _liveCond(), - _freeThreads(nullptr), - _activeThreads(nullptr), - _numFree(0), - _numActive(0), - _numTerminated(0), - _numLive(0), - _maxThreads(maxThreads) // 0 means unbounded -{ -} - -FastOS_ThreadPool::~FastOS_ThreadPool(void) -{ - Close(); -} - -void FastOS_ThreadPool::ThreadIsAboutToTerminate(FastOS_ThreadInterface *) -{ - assert(isClosed()); - - std::lock_guard guard(_liveMutex); - - _numTerminated++; - _numLive--; - if (_numLive == 0) { - _liveCond.notify_all(); - } -} - - -// This is a NOP if the thread isn't active. -void FastOS_ThreadPool::FreeThread (FastOS_ThreadInterface *thread) -{ - std::lock_guard guard(_freeMutex); - - if(thread->_active) { - LinkOutThread(thread, &_activeThreads); - - thread->_active = false; - _numActive--; - - LinkInThread(thread, &_freeThreads); - _numFree++; - } -} - -void FastOS_ThreadPool::LinkOutThread (FastOS_ThreadInterface *thread, FastOS_ThreadInterface **listHead) -{ - if (thread->_prev != nullptr) - thread->_prev->_next = thread->_next; - if (thread->_next != nullptr) - thread->_next->_prev = thread->_prev; - - if (thread == *listHead) - *listHead = thread->_next; -} - -void FastOS_ThreadPool::LinkInThread (FastOS_ThreadInterface *thread, FastOS_ThreadInterface **listHead) -{ - thread->_prev = nullptr; - thread->_next = *listHead; - - if (*listHead != nullptr) - (*listHead)->_prev = thread; - - *listHead = thread; -} - - -// _freeMutex is held by caller. -void FastOS_ThreadPool::ActivateThread (FastOS_ThreadInterface *thread) -{ - LinkOutThread(thread, &_freeThreads); - LinkInThread(thread, &_activeThreads); - - thread->_active = true; - _numActive++; - _startedThreadsCount++; -} - - -// Allocate a thread, either from pool of free or by 'new'. Finally, -// make this thread call parameter fcn when it becomes active. -FastOS_ThreadInterface *FastOS_ThreadPool::NewThread (FastOS_Runnable *owner, void *arg) -{ - FastOS_ThreadInterface *thread=nullptr; - - std::unique_lock freeGuard(_freeMutex); - - if (!isClosed()) { - if ((thread = _freeThreads) != nullptr) { - // Reusing thread entry - _freeThreads = thread->_next; - _numFree--; - - ActivateThread(thread); - } else { - // Creating new thread entry - - if (_maxThreads != 0 && ((_numActive + _numFree) >= _maxThreads)) { - fprintf(stderr, "Error: Maximum number of threads (%d)" - " already allocated.\n", _maxThreads); - } else { - freeGuard.unlock(); - { - std::lock_guard liveGuard(_liveMutex); - _numLive++; - } - thread = FastOS_Thread::CreateThread(this); - - if (thread == nullptr) { - std::lock_guard liveGuard(_liveMutex); - _numLive--; - if (_numLive == 0) { - _liveCond.notify_all(); - } - } - freeGuard.lock(); - - if(thread != nullptr) - ActivateThread(thread); - } - } - } - - freeGuard.unlock(); - if(thread != nullptr) { - std::lock_guard liveGuard(_liveMutex); - thread->Dispatch(owner, arg); - } - - return thread; -} - - -void FastOS_ThreadPool::BreakThreads () -{ - FastOS_ThreadInterface *thread; - - std::lock_guard freeGuard(_freeMutex); - - // Notice all active threads that they should quit - for(thread=_activeThreads; thread != nullptr; thread=thread->_next) { - thread->SetBreakFlag(); - } - - // Notice all free threads that they should quit - for(thread=_freeThreads; thread != nullptr; thread=thread->_next) { - thread->SetBreakFlag(); - } -} - - -void FastOS_ThreadPool::JoinThreads () -{ - std::unique_lock liveGuard(_liveMutex); - while (_numLive > 0) { - _liveCond.wait(liveGuard); - } -} - -void FastOS_ThreadPool::DeleteThreads () -{ - FastOS_ThreadInterface *thread; - - std::lock_guard freeGuard(_freeMutex); - - assert(_numActive == 0); - assert(_numLive == 0); - - while((thread = _freeThreads) != nullptr) { - LinkOutThread(thread, &_freeThreads); - _numFree--; - // printf("deleting thread %p\n", thread); - delete(thread); - } - - assert(_numFree == 0); -} - -void FastOS_ThreadPool::Close () -{ - std::unique_lock closeFlagGuard(_closeFlagMutex); - if (!_closeCalledFlag) { - _closeCalledFlag = true; - closeFlagGuard.unlock(); - - BreakThreads(); - JoinThreads(); - DeleteThreads(); - } -} - -bool FastOS_ThreadPool::isClosed() -{ - std::lock_guard closeFlagGuard(_closeFlagMutex); - bool closed(_closeCalledFlag); - return closed; -} - -extern "C" -{ -void *FastOS_ThreadHook (void *arg) -{ - FastOS_ThreadInterface *thread = static_cast(arg); - thread->Hook(); - - return nullptr; -} -}; - - -// ---------------------------------------------------------------------- -// FastOS_ThreadInterface -// ---------------------------------------------------------------------- - -void FastOS_ThreadInterface::Hook () -{ - // Loop forever doing the following: Wait on the signal _dispatched. - // When awoken, call _start_fcn with the parameters. Then zero set - // things and return this to the owner, i.e. pool of free threads - bool finished=false; - bool deleteOnCompletion = false; - - while(!finished) { - - std::unique_lock dispatchedGuard(_dispatchedMutex); // BEGIN lock - while (_owner == nullptr && !(finished = _pool->isClosed())) { - _dispatchedCond.wait(dispatchedGuard); - } - - dispatchedGuard.unlock(); // END lock - - if(!finished) { - deleteOnCompletion = _owner->DeleteOnCompletion(); - _owner->Run(this, _startArg); - - dispatchedGuard.lock(); // BEGIN lock - - if (deleteOnCompletion) { - delete _owner; - } - _owner = nullptr; - _startArg = nullptr; - _breakFlag.store(false, std::memory_order_relaxed); - finished = _pool->isClosed(); - - dispatchedGuard.unlock(); // END lock - - { - std::lock_guard runningGuard(_runningMutex); - _runningFlag = false; - _runningCond.notify_all(); - } - - _pool->FreeThread(this); - // printf("Thread given back to FastOS_ThreadPool: %p\n", this); - } - } - - _pool->ThreadIsAboutToTerminate(this); - - // Be sure not to touch any members from here on, as we are about - // to be deleted. -} - - -// Make this thread call parameter fcn with parameters argh -// when this becomes active. -// Restriction: _liveCond must be held by the caller. - -void FastOS_ThreadInterface::Dispatch(FastOS_Runnable *newOwner, void *arg) -{ - std::lock_guard dispatchedGuard(_dispatchedMutex); - - { - std::unique_lock runningGuard(_runningMutex); - while (_runningFlag) { - _runningCond.wait(runningGuard); - } - _runningFlag = true; - } - - _owner = newOwner; - _startArg = arg; - // Set _thread variable before NewThread returns - _owner->_thread.store(this, std::memory_order_release); - - // It is safe to signal after the unlock since _liveCond is still held - // so the signalled thread still exists. - // However as thread creation is infrequent and as helgrind suggest doing - // it the safe way we just do that, instead of keeping a unneccessary long - // suppressionslist. It will be long enough anyway. - - _dispatchedCond.notify_one(); -} - -void FastOS_ThreadInterface::SetBreakFlag() -{ - std::lock_guard dispatchedGuard(_dispatchedMutex); - _breakFlag.store(true, std::memory_order_relaxed); - _dispatchedCond.notify_one(); -} - - -FastOS_ThreadInterface *FastOS_ThreadInterface::CreateThread(FastOS_ThreadPool *pool) -{ - FastOS_ThreadInterface *thread = new FastOS_Thread(pool); - - if(!thread->Initialize()) { - delete(thread); - thread = nullptr; - } - - return thread; -} - -void FastOS_ThreadInterface::Join () -{ - std::unique_lock runningGuard(_runningMutex); - while (_runningFlag) { - _runningCond.wait(runningGuard); - } -} - - -// ---------------------------------------------------------------------- -// FastOS_Runnable -// ---------------------------------------------------------------------- - -FastOS_Runnable::FastOS_Runnable() - : _thread(nullptr) -{ -} - -FastOS_Runnable::~FastOS_Runnable() -{ - // assert(_thread == nullptr); -} diff --git a/fastos/src/vespa/fastos/thread.h b/fastos/src/vespa/fastos/thread.h deleted file mode 100644 index 2fb717403f2..00000000000 --- a/fastos/src/vespa/fastos/thread.h +++ /dev/null @@ -1,467 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -//************************************************************************ -/** - * @file - * Class definitions for FastOS_ThreadPool, FastOS_ThreadInterface and - * FastOS_Runnable. - * - * @author Oivind H. Danielsen - */ - -#pragma once - - -#include "types.h" -#include -#include -#include - -typedef pthread_t FastOS_ThreadId; - -class FastOS_Runnable; -class FastOS_ThreadInterface; - - -/** - * This class implements an initially empty pool of threads. - * - * As threads are allocated with @ref NewThread() the number of - * threads in the pool increases. A maximum number of threads - * contained in the pool can be set using the constructor - * FastOS_ThreadPool(int maxThreads). - * - * Threads are automatically returned to the pool when they - * terminate. - */ -class FastOS_ThreadPool -{ - friend class FastOS_ThreadInterface; - -private: - int _startedThreadsCount; - std::mutex _closeFlagMutex; - bool _closeCalledFlag; - - // Always lock in this order - mutable std::mutex _freeMutex; - std::mutex _liveMutex; - std::condition_variable _liveCond; - /** - * List of free (available) threads. - */ - FastOS_ThreadInterface *_freeThreads; - - /** - * List of active (allocated) threads. - */ - FastOS_ThreadInterface *_activeThreads; - - /** - * Number of available threads in the threadpool. - * Total number of threads = free + active - */ - int _numFree; - - /** - * Number of active threads in the threadpool. - * Total number of threads = free + active - */ - int _numActive; - - /** - * Number of threads that have terminated - */ - int _numTerminated; - - /** - * Number of threads that have not been terminated - */ - int _numLive; - - /** - * Maximum number of threads in the threadpool. A value of - * zero means that there is no limit. - */ - int _maxThreads; - - /** - * Put this thread on the @ref _activeThreads list. - */ - void ActivateThread (FastOS_ThreadInterface *thread); - - /** - * Return previously active thread to the list of free thread. - */ - void FreeThread (FastOS_ThreadInterface *thread); - - /** - * A thread is informing the thread pool that it is about to - * terminate. - */ - void ThreadIsAboutToTerminate(FastOS_ThreadInterface *thread); - - /** - * Set the break flag on all threads. - */ - void BreakThreads (); - - /** - * Wait for all threads to finish. - */ - void JoinThreads (); - - /** - * Delete all threads in threadpool. - */ - void DeleteThreads (); - - /** - * Remove a thread from a list. - */ - void LinkOutThread (FastOS_ThreadInterface *thread, - FastOS_ThreadInterface **listHead); - - /** - * Add a thread to a list. Notice that a thread can be on only one - * list at a time. - */ - void LinkInThread (FastOS_ThreadInterface *thread, - FastOS_ThreadInterface **listHead); - -public: - FastOS_ThreadPool(const FastOS_ThreadPool&) = delete; - FastOS_ThreadPool& operator=(const FastOS_ThreadPool&) = delete; - FastOS_ThreadPool(int maxThreads); - /// Unlimited threads - FastOS_ThreadPool(); - - /** - * Destructor. Closes pool if necessary. - */ - virtual ~FastOS_ThreadPool(); - - - /** - * Allocate a new thread, and make this thread invoke the Run() method - * of the @ref FastOS_Runnable object [owner] with parameters [arg]. - * The thread is automatically freed (returned to the treadpool) - * when Run() returns. - * - * @param owner Instance to be invoked by new thread. - * @param arg Arguments to be passed to new thread. - * - * @return Pointer to newly created thread or nullptr on failure. - */ - FastOS_ThreadInterface *NewThread (FastOS_Runnable *owner, void *arg=nullptr); - - /** - * Close the threadpool. This involves setting the break flag on - * all active threads, and waiting for them to finish. Once Close - * is called, no more threads can be allocated from the thread - * pool. There exists no way to reopen a closed threadpool. - */ - void Close (); - - /** - * This will tell if the pool has been closed. - */ - bool isClosed(); - - /** - * Get the number of currently active threads. - * The total number of actual allocated threads is the sum of - * @ref GetNumActiveThreads() and @ref GetNumInactiveThreads(). - * @return Number of currently active threads - */ - int GetNumActiveThreads () const { - std::lock_guard guard(_freeMutex); - return _numActive; - } - - /** - * Get the number of currently inactive threads. - * The total number of actual allocated threads is the sum of - * @ref GetNumActiveThreads() and @ref GetNumInactiveThreads(). - * @return Number of currently inactive threads - */ - int GetNumInactiveThreads () const { - std::lock_guard guard(_freeMutex); - return _numFree; - } - - /** - * Get the number of started threads since instantiation of the thread pool. - * @return Number of threads started - */ - int GetNumStartedThreads () const { return _startedThreadsCount; } -}; - - -// Operating system thread entry point -extern "C" { - void *FastOS_ThreadHook (void *arg); -} - -/** - * This class controls each operating system thread. - * - * In most cases you would not want to create objects of this class - * directly. Use @ref FastOS_ThreadPool::NewThread() instead. - */ -class FastOS_ThreadInterface -{ - friend class FastOS_ThreadPool; - friend void *FastOS_ThreadHook (void *arg); - -private: - FastOS_ThreadInterface(const FastOS_ThreadInterface&); - FastOS_ThreadInterface& operator=(const FastOS_ThreadInterface&); - -protected: - /** - * The thread does not start (call @ref FastOS_Runnable::Run()) - * until this event has been triggered. - */ - std::mutex _dispatchedMutex; - std::condition_variable _dispatchedCond; - - FastOS_ThreadInterface *_next; - FastOS_ThreadInterface *_prev; - - /** - * A pointer to the instance which implements the interface - * @ref FastOS_Runnable. - */ - FastOS_Runnable *_owner; - - /** - * A pointer to the originating @ref FastOS_ThreadPool - */ - FastOS_ThreadPool *_pool; - - /** - * Entry point for the OS thread. The thread will sleep here - * until dispatched. - */ - void Hook (); - - /** - * Signals that thread should be dispatched. - * @param owner Instance of @ref FastOS_Runnable. - * @param arg Thread invocation arguments. - */ - void Dispatch (FastOS_Runnable *owner, void *arg); - - /** - * Initializes a thread. This includes creating the operating system - * thread handle and setting it up and making it ready to be dispatched. - * @return Boolean success/failure - */ - virtual bool Initialize ()=0; - - /** - * Used to store thread invocation arguments. These are passed along - * to @ref FastOS_Runnable::Run() when the thread is dispatched. - */ - void *_startArg; - - /** - * Create an operating system thread. In most cases you would want - * to create threads using @ref FastOS_ThreadPool::NewThread() instead. - * @param pool The threadpool which is about to contain the new thread. - * @return A new @ref FastOS_Thread or nullptr on failure. - */ - static FastOS_ThreadInterface *CreateThread(FastOS_ThreadPool *pool); - - /** - * Break flag. If true, the thread should exit. - */ - std::atomic _breakFlag; - - /** - * Is this thread active or free in the threadpool? - */ - bool _active; - - /** - * Is the thread running? This is used by @ref Join(), to wait for threads - * to finish. - */ - std::mutex _runningMutex; - std::condition_variable _runningCond; - bool _runningFlag; - -public: - /** - * Constructor. Resets internal attributes. - */ - FastOS_ThreadInterface (FastOS_ThreadPool *pool) - : _dispatchedMutex(), - _dispatchedCond(), - _next(nullptr), - _prev(nullptr), - _owner(nullptr), - _pool(pool), - _startArg(nullptr), - _breakFlag(false), - _active(false), - _runningMutex(), - _runningCond(), - _runningFlag(false) - { - } - - /** - * Destructor. - */ - virtual ~FastOS_ThreadInterface () {} - - /** - * Instruct a thread to exit. This could be used in conjunction with - * @ref GetBreakFlag() in a worker thread, to have cooperative thread - * termination. When a threadpool closes, all threads in the pool will - * have their break flag set. - */ - void SetBreakFlag (); - - /** - * Return the status of this thread's break flag. If the break flag - * is set, someone wants the thread to terminate. It is up to the - * implementor of the thread to decide whether the break flag - * should be used. - * - * In scenarios where a worker thread loops "forever" waiting for - * new jobs, the break flag should be polled in order to eventually - * exit from the loop and terminate the thread. - * - * In scenarios where a worker thread performs a task which - * always should run to completion, the break flag could be ignored - * as the thread sooner or later will terminate. - * - * When a threadpool is closed, the break flag is set on all - * threads in the pool. If a thread loops forever and chooses to - * ignore the break flag, a @ref FastOS_ThreadPool::Close() will - * never finish. (see @ref SetBreakFlag) - */ - bool GetBreakFlag () const - { - return _breakFlag.load(std::memory_order_relaxed); - } - - /** - * Wait for a thread to finish. - */ - void Join (); - - /** - * Returns the id of this thread. - */ - virtual FastOS_ThreadId GetThreadId () const noexcept = 0; -}; - - -/** - * This class gives a generic interface for invoking new threads with an object. - * - * The thread object should inherit this interface (class), and implement - * the @ref Run() method. When @ref FastOS_ThreadPool::NewThread() is - * called, the @ref Run() method of the passed instance will be invoked. - * - * Arguments could be supplied via @ref FastOS_ThreadPool::NewThread(), but - * it is also possible to supply arguments to the new thread through the - * worker thread object constructor or some other attribute-setting method - * prior to creating the thread. Choose whichever method works best for you. - * - * Example: - * @code - * // Arguments passed to the new thread. - * struct MyThreadArgs - * { - * int _something; - * char _tenChars[10]; - * }; - * - * class MyWorkerThread : public FastOS_Runnable - * { - * public: - * - * // Delete this instance upon completion - * virtual bool DeleteOnCompletion() const { return true; } - * - * virtual void Run (FastOS_ThreadInterface *thread, void *arguments) - * { - * MyThreadArgs *args = static_cast(arguments); - * - * // Do some computation... - * Foo(args->_something); - * - * for(int i=0; i<30000; i++) - * { - * ... - * ... - * - * if(thread->GetBreakFlag()) - * break; - * ... - * ... - * - * } - * - * // Thread terminates... - * } - * }; - * - * - * // Example on how to create a thread using the above classes. - * void SomeClass::SomeMethod (FastOS_ThreadPool *pool) - * { - * MyWorkerThread *workerThread = new MyWorkerThread(); - * static MyThreadArgs arguments; - * - * arguments._something = 123456; - * - * // the workerThread instance will be deleted when Run completes - * // see the DeleteOnCompletion doc - * pool->NewThread(workerThread, &arguments); - * } - * @endcode - */ -class FastOS_Runnable -{ -private: - friend class FastOS_ThreadInterface; - std::atomic _thread; - -public: - FastOS_Runnable(const FastOS_Runnable&) = delete; - FastOS_Runnable& operator=(const FastOS_Runnable&) = delete; - FastOS_Runnable(); - virtual ~FastOS_Runnable(); - - /** - * The DeleteOnCompletion method should be overridden to return true - * if the runnable instance should be deleted when run completes - * - * @author Nils Sandoy - * @return true iff this runnable instance should be deleted on completion - */ - virtual bool DeleteOnCompletion() const { return false; } - - /** - * When an object implementing interface @ref FastOS_Runnable is used to - * create a thread, starting the thread causes the object's @ref Run() - * method to be called in that separately executing thread. The thread - * terminates when @ref Run() returns. - * @param thisThread A thread object. - * @param arguments Supplied to @ref FastOS_ThreadPool::NewThread - */ - virtual void Run(FastOS_ThreadInterface *thisThread, void *arguments)=0; - - FastOS_ThreadInterface *GetThread() noexcept { return _thread.load(std::memory_order_acquire); } - const FastOS_ThreadInterface *GetThread() const noexcept { return _thread.load(std::memory_order_acquire); } - bool HasThread() const noexcept { return GetThread() != nullptr; } -}; - -#include -typedef FastOS_UNIX_Thread FASTOS_PREFIX(Thread); - diff --git a/fastos/src/vespa/fastos/unix_thread.cpp b/fastos/src/vespa/fastos/unix_thread.cpp deleted file mode 100644 index 621505b7e02..00000000000 --- a/fastos/src/vespa/fastos/unix_thread.cpp +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "thread.h" - -bool FastOS_UNIX_Thread::Initialize () -{ - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - _handleValid = (0 == pthread_create(&_handle, &attr, FastOS_ThreadHook, this)); - pthread_attr_destroy(&attr); - - return _handleValid; -} - -FastOS_UNIX_Thread::~FastOS_UNIX_Thread() -{ - if (!_handleValid) return; - - void *value = nullptr; - pthread_join(_handle, &value); -} - -FastOS_ThreadId FastOS_UNIX_Thread::GetThreadId () const noexcept -{ - return _handle; -} - -FastOS_ThreadId FastOS_UNIX_Thread::GetCurrentThreadId () -{ - return pthread_self(); -} - -bool FastOS_UNIX_Thread::CompareThreadIds (FastOS_ThreadId a, FastOS_ThreadId b) -{ - return (pthread_equal(a, b) != 0); -} diff --git a/fastos/src/vespa/fastos/unix_thread.h b/fastos/src/vespa/fastos/unix_thread.h deleted file mode 100644 index c3c757e3fd9..00000000000 --- a/fastos/src/vespa/fastos/unix_thread.h +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** -****************************************************************************** -* @author Oivind H. Danielsen -* @date Creation date: 2000-02-02 -* @file -* Class definition for FastOS_UNIX_Thread -*****************************************************************************/ - -#pragma once - -#include "thread.h" - -class FastOS_UNIX_Thread : public FastOS_ThreadInterface -{ -protected: - pthread_t _handle; - bool _handleValid; - - bool Initialize () override; -public: - FastOS_UNIX_Thread(const FastOS_UNIX_Thread &) = delete; - FastOS_UNIX_Thread& operator=(const FastOS_UNIX_Thread &) = delete; - FastOS_UNIX_Thread(FastOS_ThreadPool *pool) - : FastOS_ThreadInterface(pool), - _handle(), - _handleValid(false) - {} - - ~FastOS_UNIX_Thread() override; - - FastOS_ThreadId GetThreadId () const noexcept override; - static bool CompareThreadIds (FastOS_ThreadId a, FastOS_ThreadId b); - static FastOS_ThreadId GetCurrentThreadId (); -}; - - -- cgit v1.2.3