diff options
author | Harald Musum <musum@yahoo-inc.com> | 2017-10-31 21:18:48 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-31 21:18:48 +0100 |
commit | 9c182da4cc47f43d4d7fa3d9df18ddd39a34d694 (patch) | |
tree | 3617462ce921e1f94e361142b6f22b38e4bfdbf1 /fastos/src | |
parent | 405bd2337aa06c52262659a3e9ed8e41cdb4a93b (diff) |
Revert "Toregge/use standard locks in fastos"
Diffstat (limited to 'fastos/src')
27 files changed, 689 insertions, 276 deletions
diff --git a/fastos/src/tests/job.h b/fastos/src/tests/job.h index 1d35ec95270..a5b84fa0f9c 100644 --- a/fastos/src/tests/job.h +++ b/fastos/src/tests/job.h @@ -2,9 +2,6 @@ #pragma once -#include <mutex> -#include <condition_variable> - enum JobCode { PRINT_MESSAGE_AND_WAIT3SEC, @@ -31,8 +28,9 @@ private: public: JobCode code; char *message; - std::mutex *mutex; - std::condition_variable *condition; + FastOS_Mutex *mutex; + FastOS_Cond *condition; + FastOS_BoolCond *boolcondition; FastOS_ThreadInterface *otherThread, *ownThread; double *timebuf; double average; @@ -47,6 +45,7 @@ public: message(nullptr), mutex(nullptr), condition(nullptr), + boolcondition(nullptr), otherThread(nullptr), ownThread(nullptr), timebuf(nullptr), diff --git a/fastos/src/tests/processtest.cpp b/fastos/src/tests/processtest.cpp index cd6839fd0aa..11e1307027d 100644 --- a/fastos/src/tests/processtest.cpp +++ b/fastos/src/tests/processtest.cpp @@ -17,14 +17,15 @@ public: static int _allocCount; static int _successCount; static int _failCount; - static std::mutex *_counterLock; + static FastOS_Mutex *_counterLock; MyListener (const char *title) : _title(title), _receivedBytes(0) { - std::lock_guard<std::mutex> guard(*_counterLock); - _allocCount++; + _counterLock->Lock(); + _allocCount++; + _counterLock->Unlock(); } virtual ~MyListener () @@ -33,13 +34,14 @@ public: const int correctByteCount = 16; - std::lock_guard<std::mutex> guard(*_counterLock); + _counterLock->Lock(); if(_receivedBytes == (isStdout ? correctByteCount : 0)) _successCount++; else _failCount++; _allocCount--; + _counterLock->Unlock(); } void OnReceiveData (const void *data, size_t length) override @@ -60,7 +62,7 @@ public: int MyListener::_allocCount = 0; int MyListener::_successCount = 0; int MyListener::_failCount = 0; -std::mutex *MyListener::_counterLock = nullptr; +FastOS_Mutex *MyListener::_counterLock = nullptr; class ThreadRunJob : public FastOS_Runnable @@ -120,7 +122,7 @@ private: // or not. bool _gotMessage; int _receivedMessages; - std::mutex *_counterLock; + FastOS_Mutex *_counterLock; bool _isChild; public: ProcessTest () @@ -154,8 +156,9 @@ public: // We only have the counter lock if we are the parent process. if(_counterLock != nullptr) { - std::lock_guard<std::mutex> guard(*_counterLock); - _receivedMessages++; + _counterLock->Lock(); + _receivedMessages++; + _counterLock->Unlock(); } } @@ -216,7 +219,7 @@ public: const int numLoops = 100; const int numEachTime = 40; - MyListener::_counterLock = new std::mutex; + MyListener::_counterLock = new FastOS_Mutex(); char testHeader[200]; strcpy(testHeader, "Process Test"); @@ -378,7 +381,7 @@ public: TestHeader ("IPC Test"); const char *childProgram = _argv[1]; - _counterLock = new std::mutex; + _counterLock = new FastOS_Mutex(); int i; for(i=0; i<30; i++) diff --git a/fastos/src/tests/thread_bounce_test.cpp b/fastos/src/tests/thread_bounce_test.cpp index 423221d55cb..bf94f3e1aab 100644 --- a/fastos/src/tests/thread_bounce_test.cpp +++ b/fastos/src/tests/thread_bounce_test.cpp @@ -14,10 +14,8 @@ class Thread_Bounce_Test : public ThreadTestBase TestHeader("Bounce Test"); FastOS_ThreadPool pool(128 * 1024); - std::mutex mutex1; - std::condition_variable cond1; - std::mutex mutex2; - std::condition_variable cond2; + FastOS_Cond cond1; + FastOS_Cond cond2; Job job1; Job job2; FastOS_Time checkTime; @@ -30,9 +28,7 @@ class Thread_Bounce_Test : public ThreadTestBase job2.code = BOUNCE_CONDITIONS; job1.otherjob = &job2; job2.otherjob = &job1; - job1.mutex = &mutex1; job1.condition = &cond1; - job2.mutex = &mutex2; job2.condition = &cond2; job1.ownThread = pool.NewThread(this, static_cast<void *>(&job1)); @@ -48,28 +44,28 @@ class Thread_Bounce_Test : public ThreadTestBase left = static_cast<int>(checkTime.MilliSecsToNow()); } - mutex1.lock(); + cond1.Lock(); cnt1 = job1.bouncewakeupcnt; - mutex1.unlock(); - mutex2.lock(); + cond1.Unlock(); + cond2.Lock(); cnt2 = job2.bouncewakeupcnt; - mutex2.unlock(); + cond2.Unlock(); cntsum = cnt1 + cnt2; Progress(lastcntsum != cntsum, "%d bounces", cntsum); lastcntsum = cntsum; } job1.ownThread->SetBreakFlag(); - mutex1.lock(); + cond1.Lock(); job1.bouncewakeup = true; - cond1.notify_one(); - mutex1.unlock(); + cond1.Signal(); + cond1.Unlock(); job2.ownThread->SetBreakFlag(); - mutex2.lock(); + cond2.Lock(); job2.bouncewakeup = true; - cond2.notify_one(); - mutex2.unlock(); + cond2.Signal(); + cond2.Unlock(); pool.Close(); Progress(true, "Pool closed."); diff --git a/fastos/src/tests/thread_joinwait_test.cpp b/fastos/src/tests/thread_joinwait_test.cpp index 7153a05f836..05ab1627334 100644 --- a/fastos/src/tests/thread_joinwait_test.cpp +++ b/fastos/src/tests/thread_joinwait_test.cpp @@ -25,11 +25,11 @@ class Thread_JoinWait_Test : public ThreadTestBase Job jobs[testThreads]; - std::mutex jobMutex; + FastOS_Mutex jobMutex; // The mutex is used to pause the first threads until we have created // the last one. - jobMutex.lock(); + jobMutex.Lock(); for(i=0; i<lastThreadNum; i++) { @@ -68,7 +68,7 @@ class Thread_JoinWait_Test : public ThreadTestBase } } - jobMutex.unlock(); + jobMutex.Unlock(); if((variant & 1) != 0) { diff --git a/fastos/src/tests/thread_mutex_test.cpp b/fastos/src/tests/thread_mutex_test.cpp index d49cf37163d..b8ac575038b 100644 --- a/fastos/src/tests/thread_mutex_test.cpp +++ b/fastos/src/tests/thread_mutex_test.cpp @@ -25,11 +25,10 @@ class Thread_Mutex_Test : public ThreadTestBase { int i; Job jobs[MUTEX_TEST_THREADS]; - std::mutex *myMutex=nullptr; + FastOS_Mutex *myMutex=nullptr; - if(usingMutex) { - myMutex = new std::mutex; - } + if(usingMutex) + myMutex = new FastOS_Mutex(); for(i=0; i<MUTEX_TEST_THREADS; i++) { @@ -118,7 +117,7 @@ class Thread_Mutex_Test : public ThreadTestBase FastOS_ThreadPool pool(128*1024); Job job; - std::mutex mtx; + FastOS_Mutex mtx; job.code = HOLD_MUTEX_FOR2SEC; job.result = -1; @@ -136,28 +135,28 @@ class Thread_Mutex_Test : public ThreadTestBase for(int i=0; i<5; i++) { - lockrc = mtx.try_lock(); + lockrc = mtx.TryLock(); Progress(!lockrc, "We should not get the mutex lock just yet (%s)", lockrc ? "got it" : "didn't get it"); if(lockrc) { - mtx.unlock(); + mtx.Unlock(); break; } } FastOS_Thread::Sleep(2000); - lockrc = mtx.try_lock(); + lockrc = mtx.TryLock(); Progress(lockrc, "We should get the mutex lock now (%s)", lockrc ? "got it" : "didn't get it"); if(lockrc) - mtx.unlock(); + mtx.Unlock(); Progress(true, "Attempting to do normal lock..."); - mtx.lock(); + mtx.Lock(); Progress(true, "Got lock. Attempt to do normal unlock..."); - mtx.unlock(); + mtx.Unlock(); Progress(true, "Unlock OK."); } diff --git a/fastos/src/tests/thread_test_base.hpp b/fastos/src/tests/thread_test_base.hpp index 7966e95b369..5305b132d3c 100644 --- a/fastos/src/tests/thread_test_base.hpp +++ b/fastos/src/tests/thread_test_base.hpp @@ -2,17 +2,13 @@ #pragma once -#include <chrono> - static volatile int64_t number; #define INCREASE_NUMBER_AMOUNT 10000 -using namespace std::chrono_literals; - class ThreadTestBase : public BaseTest, public FastOS_Runnable { private: - std::mutex printMutex; + FastOS_Mutex printMutex; public: ThreadTestBase(void) @@ -23,8 +19,9 @@ public: void PrintProgress (char *string) override { - std::lock_guard<std::mutex> guard(printMutex); + printMutex.Lock(); BaseTest::PrintProgress(string); + printMutex.Unlock(); } void Run (FastOS_ThreadInterface *thread, void *arg) override; @@ -96,10 +93,8 @@ void ThreadTestBase::Run (FastOS_ThreadInterface *thread, void *arg) { int result; - std::unique_lock<std::mutex> guard; - if(job->mutex != nullptr) { - guard = std::unique_lock<std::mutex>(*job->mutex); - } + if(job->mutex != nullptr) + job->mutex->Lock(); result = static_cast<int>(number); @@ -112,7 +107,8 @@ void ThreadTestBase::Run (FastOS_ThreadInterface *thread, void *arg) FastOS_Thread::Sleep(1000); } - guard = std::unique_lock<std::mutex>(); + if(job->mutex != nullptr) + job->mutex->Unlock(); job->result = result; // This marks the end of the thread @@ -136,23 +132,26 @@ void ThreadTestBase::Run (FastOS_ThreadInterface *thread, void *arg) case WAIT_FOR_THREAD_TO_FINISH: { - std::unique_lock<std::mutex> guard; - if (job->mutex != nullptr) { - guard = std::unique_lock<std::mutex>(*job->mutex); - } + if(job->mutex) + job->mutex->Lock(); if(job->otherThread != nullptr) job->otherThread->Join(); + if(job->mutex) + job->mutex->Unlock(); break; } case WAIT_FOR_CONDITION: { - std::unique_lock<std::mutex> guard(*job->mutex); + job->condition->Lock(); + job->result = 1; - job->condition->wait(guard); - guard.unlock(); + + job->condition->Wait(); + job->condition->Unlock(); + job->result = 0; break; @@ -161,25 +160,25 @@ void ThreadTestBase::Run (FastOS_ThreadInterface *thread, void *arg) case BOUNCE_CONDITIONS: { while (!thread->GetBreakFlag()) { - { - std::lock_guard<std::mutex> guard(*job->otherjob->mutex); - job->otherjob->bouncewakeupcnt++; - job->otherjob->bouncewakeup = true; - job->otherjob->condition->notify_one(); - } - std::unique_lock<std::mutex> guard(*job->mutex); - while (!job->bouncewakeup) { - job->condition->wait_for(guard, 1ms); - } - job->bouncewakeup = false; + job->otherjob->condition->Lock(); + job->otherjob->bouncewakeupcnt++; + job->otherjob->bouncewakeup = true; + job->otherjob->condition->Signal(); + job->otherjob->condition->Unlock(); + + job->condition->Lock(); + while (!job->bouncewakeup) + job->condition->TimedWait(1); + job->bouncewakeup = false; + job->condition->Unlock(); } break; } case TEST_ID: { - job->mutex->lock(); // Initially the parent threads owns the lock - job->mutex->unlock(); // It is unlocked when we should start + job->mutex->Lock(); // Initially the parent threads owns the lock + job->mutex->Unlock(); // It is unlocked when we should start FastOS_ThreadId currentId = FastOS_Thread::GetCurrentThreadId(); @@ -193,19 +192,18 @@ void ThreadTestBase::Run (FastOS_ThreadInterface *thread, void *arg) case WAIT2SEC_AND_SIGNALCOND: { FastOS_Thread::Sleep(2000); - job->condition->notify_one(); + job->condition->Signal(); job->result = 1; break; } case HOLD_MUTEX_FOR2SEC: { - { - std::lock_guard<std::mutex> guard(*job->mutex); - FastOS_Thread::Sleep(2000); - } - job->result = 1; - break; + job->mutex->Lock(); + FastOS_Thread::Sleep(2000); + job->mutex->Unlock(); + job->result = 1; + break; } case WAIT_2_SEC: diff --git a/fastos/src/tests/threadtest.cpp b/fastos/src/tests/threadtest.cpp index b0b64697129..81ea234fb97 100644 --- a/fastos/src/tests/threadtest.cpp +++ b/fastos/src/tests/threadtest.cpp @@ -5,18 +5,18 @@ #include "thread_test_base.hpp" #include <vespa/fastos/time.h> #include <cstdlib> -#include <chrono> #define MUTEX_TEST_THREADS 6 #define MAX_THREADS 7 + class ThreadTest : public ThreadTestBase { int Main () override; void WaitForXThreadsToHaveWait (Job *jobs, int jobCount, - std::mutex &mutex, + FastOS_Cond *condition, int numWait) { Progress(true, "Waiting for %d threads to be in wait state", numWait); @@ -26,15 +26,16 @@ class ThreadTest : public ThreadTestBase { int waitingThreads=0; + condition->Lock(); + + for(int i=0; i<jobCount; i++) { - std::lock_guard<std::mutex> guard(mutex); - for(int i=0; i<jobCount; i++) - { - if(jobs[i].result == 1) - waitingThreads++; - } + if(jobs[i].result == 1) + waitingThreads++; } + condition->Unlock(); + if(waitingThreads != oldNumber) Progress(true, "%d threads are waiting", waitingThreads); @@ -322,14 +323,12 @@ class ThreadTest : public ThreadTestBase } void SharedSignalAndBroadcastTest (Job *jobs, int numThreads, - std::mutex *mutex, - std::condition_variable *condition, + FastOS_Cond *condition, FastOS_ThreadPool *pool) { for(int i=0; i<numThreads; i++) { jobs[i].code = WAIT_FOR_CONDITION; - jobs[i].mutex = mutex; jobs[i].condition = condition; jobs[i].ownThread = pool->NewThread(this, static_cast<void *>(&jobs[i])); @@ -339,7 +338,7 @@ class ThreadTest : public ThreadTestBase } WaitForXThreadsToHaveWait (jobs, numThreads, - *mutex, numThreads); + condition, numThreads); // Threads are not guaranteed to have entered sleep yet, // as this test only tests for result code @@ -355,16 +354,15 @@ class ThreadTest : public ThreadTestBase FastOS_ThreadPool pool(128*1024); Job jobs[numThreads]; - std::mutex mutex; - std::condition_variable condition; + FastOS_Cond condition; - SharedSignalAndBroadcastTest(jobs, numThreads, &mutex, &condition, &pool); + SharedSignalAndBroadcastTest(jobs, numThreads, &condition, &pool); for(int i=0; i<numThreads; i++) { - condition.notify_one(); + condition.Signal(); WaitForXThreadsToHaveWait(jobs, numThreads, - mutex, numThreads-1-i); + &condition, numThreads-1-i); } Progress(true, "Waiting for threads to finish using pool.Close()..."); @@ -381,13 +379,12 @@ class ThreadTest : public ThreadTestBase FastOS_ThreadPool pool(128*1024); Job jobs[numThreads]; - std::mutex mutex; - std::condition_variable condition; + FastOS_Cond condition; - SharedSignalAndBroadcastTest(jobs, numThreads, &mutex, &condition, &pool); + SharedSignalAndBroadcastTest(jobs, numThreads, &condition, &pool); - condition.notify_all(); - WaitForXThreadsToHaveWait(jobs, numThreads, mutex, 0); + condition.Broadcast(); + WaitForXThreadsToHaveWait(jobs, numThreads, &condition, 0); Progress(true, "Waiting for threads to finish using pool.Close()..."); pool.Close(); @@ -404,9 +401,9 @@ class ThreadTest : public ThreadTestBase FastOS_ThreadPool pool(128*1024); Job jobs[numThreads]; - std::mutex slowStartMutex; + FastOS_Mutex slowStartMutex; - slowStartMutex.lock(); // Halt all threads until we want them to run + slowStartMutex.Lock(); // Halt all threads until we want them to run for(i=0; i<numThreads; i++) { jobs[i].code = TEST_ID; @@ -431,7 +428,7 @@ class ThreadTest : public ThreadTestBase } } - slowStartMutex.unlock(); // Allow threads to run + slowStartMutex.Unlock(); // Allow threads to run Progress(true, "Waiting for threads to finish using pool.Close()..."); pool.Close(); @@ -452,12 +449,10 @@ class ThreadTest : public ThreadTestBase FastOS_ThreadPool pool(128*1024); Job job; - std::mutex mutex; - std::condition_variable condition; + FastOS_Cond condition; job.code = WAIT2SEC_AND_SIGNALCOND; job.result = -1; - job.mutex = &mutex; job.condition = &condition; job.ownThread = pool.NewThread(this, static_cast<void *>(&job)); @@ -466,17 +461,18 @@ class ThreadTest : public ThreadTestBase if(job.ownThread != nullptr) { - std::unique_lock<std::mutex> guard(mutex); - bool gotCond = condition.wait_for(guard, 500ms) == std::cv_status::no_timeout; + condition.Lock(); + bool gotCond = condition.TimedWait(500); Progress(!gotCond, "We should not get the condition just yet (%s)", gotCond ? "got it" : "didn't get it"); - gotCond = condition.wait_for(guard, 500ms) == std::cv_status::no_timeout; + gotCond = condition.TimedWait(500); Progress(!gotCond, "We should not get the condition just yet (%s)", gotCond ? "got it" : "didn't get it"); - gotCond = condition.wait_for(guard, 5000ms) == std::cv_status::no_timeout; + gotCond = condition.TimedWait(5000); Progress(gotCond, "We should have got the condition now (%s)", gotCond ? "got it" : "didn't get it"); - } + condition.Unlock(); + } Progress(true, "Waiting for threads to finish using pool.Close()..."); pool.Close(); @@ -495,22 +491,31 @@ class ThreadTest : public ThreadTestBase for(i=0; i<allocCount; i++) { - std::mutex *mtx = new std::mutex; - mtx->lock(); - mtx->unlock(); + FastOS_Mutex *mtx = new FastOS_Mutex(); + mtx->Lock(); + mtx->Unlock(); delete mtx; if((i % progressIndex) == (progressIndex - 1)) - Progress(true, "Tested %d std::mutex instances", i + 1); + Progress(true, "Tested %d FastOS_Mutex instances", i + 1); + } + + for(i=0; i<allocCount; i++) + { + FastOS_Cond *cond = new FastOS_Cond(); + delete cond; + + if((i % progressIndex) == (progressIndex - 1)) + Progress(true, "Tested %d FastOS_Cond instances", i+1); } for(i=0; i<allocCount; i++) { - std::condition_variable *cond = new std::condition_variable; + FastOS_BoolCond *cond = new FastOS_BoolCond(); delete cond; if((i % progressIndex) == (progressIndex - 1)) - Progress(true, "Tested %d std::condition_variable instances", i+1); + Progress(true, "Tested %d FastOS_BoolCond instances", i+1); } PrintSeparator(); @@ -523,13 +528,13 @@ class ThreadTest : public ThreadTestBase const int allocCount = 150000; int i; - std::mutex **mutexes = new std::mutex*[allocCount]; + FastOS_Mutex **mutexes = new FastOS_Mutex*[allocCount]; FastOS_Time startTime, nowTime; startTime.SetNow(); for(i=0; i<allocCount; i++) - mutexes[i] = new std::mutex; + mutexes[i] = new FastOS_Mutex(); nowTime.SetNow(); Progress(true, "Allocated %d mutexes at time: %d ms", allocCount, @@ -538,10 +543,10 @@ class ThreadTest : public ThreadTestBase for(int e=0; e<4; e++) { for(i=0; i<allocCount; i++) - mutexes[i]->lock(); + mutexes[i]->Lock(); for(i=0; i<allocCount; i++) - mutexes[i]->unlock(); + mutexes[i]->Unlock(); nowTime.SetNow(); Progress(true, "Tested %d mutexes at time: %d ms", allocCount, diff --git a/fastos/src/tests/typetest.cpp b/fastos/src/tests/typetest.cpp index 503c9a30d24..209af305501 100644 --- a/fastos/src/tests/typetest.cpp +++ b/fastos/src/tests/typetest.cpp @@ -16,8 +16,11 @@ private: TestHeader("Object Sizes (bytes)"); Progress(true, "FastOS_Application: %d", sizeof(FastOS_Application)); + Progress(true, "FastOS_BoolCond %d", sizeof(FastOS_BoolCond)); + Progress(true, "FastOS_Cond %d", sizeof(FastOS_Cond)); Progress(true, "FastOS_DirectoryScan %d", sizeof(FastOS_DirectoryScan)); Progress(true, "FastOS_File: %d", sizeof(FastOS_File)); + Progress(true, "FastOS_Mutex: %d", sizeof(FastOS_Mutex)); Progress(true, "FastOS_Runnable %d", sizeof(FastOS_Runnable)); Progress(true, "FastOS_ServerSocket %d", sizeof(FastOS_ServerSocket)); Progress(true, "FastOS_Socket: %d", sizeof(FastOS_Socket)); diff --git a/fastos/src/vespa/fastos/CMakeLists.txt b/fastos/src/vespa/fastos/CMakeLists.txt index f98e5b8d97b..2a0ff2d370a 100644 --- a/fastos/src/vespa/fastos/CMakeLists.txt +++ b/fastos/src/vespa/fastos/CMakeLists.txt @@ -13,9 +13,11 @@ vespa_add_library(fastos_objects OBJECT time.cpp timestamp.cpp unix_app.cpp + unix_cond.cpp unix_dynamiclibrary.cpp unix_file.cpp unix_ipc.cpp + unix_mutex.cpp unix_process.cpp unix_socket.cpp unix_thread.cpp diff --git a/fastos/src/vespa/fastos/app.cpp b/fastos/src/vespa/fastos/app.cpp index 822683540f7..824d009591f 100644 --- a/fastos/src/vespa/fastos/app.cpp +++ b/fastos/src/vespa/fastos/app.cpp @@ -65,7 +65,7 @@ bool FastOS_ApplicationInterface::Init () if(errorMsg == nullptr) { - _processListMutex = new std::mutex; + _processListMutex = new FastOS_Mutex(); _threadPool = new FastOS_ThreadPool(128 * 1024); rc = true; } diff --git a/fastos/src/vespa/fastos/app.h b/fastos/src/vespa/fastos/app.h index 9560d1ced6a..283db64985c 100644 --- a/fastos/src/vespa/fastos/app.h +++ b/fastos/src/vespa/fastos/app.h @@ -15,7 +15,7 @@ class FastOS_ProcessInterface; class FastOS_ThreadPool; -#include <mutex> +#include <vespa/fastos/mutex.h> /** * FastOS application wrapper class. @@ -143,7 +143,7 @@ protected: FastOS_ThreadPool *_threadPool; FastOS_ProcessInterface *_processList; - std::mutex *_processListMutex; + FastOS_Mutex *_processListMutex; bool _disableLeakReporting; virtual bool PreThreadInit () { return true; } @@ -248,7 +248,8 @@ public: void AddChildProcess (FastOS_ProcessInterface *node); void RemoveChildProcess (FastOS_ProcessInterface *node); - std::unique_lock<std::mutex> getProcessGuard() { return std::unique_lock<std::mutex>(*_processListMutex); } + void ProcessLock () { _processListMutex->Lock(); } + void ProcessUnlock() { _processListMutex->Unlock(); } FastOS_ProcessInterface *GetProcessList () { return _processList; } FastOS_ThreadPool *GetThreadPool (); diff --git a/fastos/src/vespa/fastos/cond.h b/fastos/src/vespa/fastos/cond.h new file mode 100644 index 00000000000..c9405728223 --- /dev/null +++ b/fastos/src/vespa/fastos/cond.h @@ -0,0 +1,165 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +//************************************************************************ +/** + * @file + * Class definitions for FastOS_CondInterface and FastOS_BoolCond. + * + * @author Div, Oivind H. Danielsen + */ + +#pragma once + +#include "mutex.h" + + +/** + * This class implements a synchronization mechanism used by threads to wait + * until a condition expression involving shared data attains a particular state. + * + * Condition variables provide a different type of synchronization + * than locking mechanisms like mutexes. For instance, a mutex is used + * to cause other threads to wait while the thread holding the mutex + * executes code in a critical section. In contrast, a condition + * variable is typically used by a thread to make itself wait until an + * expression involving shared data attains a particular state. + */ +class FastOS_CondInterface : public FastOS_Mutex +{ +public: + FastOS_CondInterface(void) : FastOS_Mutex() { } + + virtual ~FastOS_CondInterface () {} + + /** + * Wait for the condition to be signalled. If the wait takes + * longer than [milliseconds] ms, the wait is aborted and false + * is returned. + * @param milliseconds Max time to wait. + * @return Boolean success/failure + */ + virtual bool TimedWait (int milliseconds) = 0; + + /** + * Wait for the condition to be signalled. + */ + virtual void Wait (void)=0; + + /** + * Send a signal to one thread waiting on the condition (if any). + */ + virtual void Signal (void)=0; + + /** + * Send a signal to all threads waiting on the condition. + */ + virtual void Broadcast (void)=0; +}; + +#include <vespa/fastos/unix_cond.h> +typedef FastOS_UNIX_Cond FASTOS_PREFIX(Cond); + +/** + * This class implements a condition variable with a boolean + * value. + */ +class FastOS_BoolCond : public FastOS_Cond +{ + bool _busy; + +public: + /** + * Constructor. Initially the boolean variable is + * set to non-busy. + */ + FastOS_BoolCond(void) : _busy(false) { } + + ~FastOS_BoolCond(void) { } + + /** + * If the variable is busy, wait for it to be non-busy, + * then set the variable to busy. */ + void SetBusy(void) + { + Lock(); + + while (_busy == true) + Wait(); + + _busy = true; + Unlock(); + } + + /** + * If the variable is busy, wait until it is no longer busy. + * If it was non-busy to begin with, no wait is performed. + */ + void WaitBusy(void) + { + Lock(); + + while (_busy == true) + Wait(); + + Unlock(); + } + + /** + * If the variable is busy, wait until it is no longer busy or a + * timeout occurs. If it was non-busy to begin with, no wait is + * performed. + * @param ms Time to wait + * @return True=non-busy, false=timeout + */ + bool TimedWaitBusy(int ms) + { + bool success = true; + + Lock(); + if (_busy == true) { + success = TimedWait(ms); + } + Unlock(); + + return success; + } + + /** + * Return busy status. + * @return True=busy, false=non-busy + */ + bool PollBusy (void) + { + bool rc; + Lock(); + rc = _busy; + Unlock(); + return rc; + } + + /** + * Set the variable to non-busy, and signal one thread + * waiting (if there are any). + * (if any). + */ + void ClearBusy(void) + { + Lock(); + _busy = false; + Signal(); + Unlock(); + } + + /** + * Set the variable to non-busy, and broadcast to all + * threads waiting (if there are any). + */ + void ClearBusyBroadcast(void) + { + Lock(); + _busy = false; + Broadcast(); + Unlock(); + } +}; + + diff --git a/fastos/src/vespa/fastos/mutex.h b/fastos/src/vespa/fastos/mutex.h new file mode 100644 index 00000000000..530e8d007bc --- /dev/null +++ b/fastos/src/vespa/fastos/mutex.h @@ -0,0 +1,64 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +//************************************************************************ +/** + * @file + * Class definition for FastOS_Mutex. + * + * @author Div, Oivind H. Danielsen + */ + +#pragma once + +#include "types.h" + +/** + * This class defines a mutual-exclusion object. + * + * Facilitates synchronized access to mutual-exclusion zones in the program. + * Before entering code sections where only a single thread at the time can + * operate, use @ref Lock(). If another thread is holding the lock at the + * time, the calling thread will sleep until the current holder of the mutex + * is through using it. + * + * Use @ref Unlock() to release the mutex lock. This will allow other threads + * to obtain the lock. + */ + +class FastOS_MutexInterface +{ +public: + /** + * Destructor + */ + virtual ~FastOS_MutexInterface () {} + + /** + * Obtain an exclusive lock on the mutex. The result of a recursive lock + * is currently undefined. The caller should assume this will result + * in a deadlock situation. + * A recursive lock occurs when a thread, currently owning the lock, + * attempts to lock the mutex a second time. + * + * Use @ref Unlock() to unlock the mutex when done. + */ + virtual void Lock ()=0; + + /** + * Try to obtain an exclusive lock on the mutex. If a lock cannot be + * obtained right away, the method will return false. There will + * be no blocking/waiting for the mutex lock to be available. If + * the mutex was locked in the attempt, true is returned. + * @return Boolean success/failure + */ + virtual bool TryLock ()=0; + + /** + * Unlock a locked mutex. The result of unlocking a mutex not already + * locked by the calling thread is undefined. + */ + virtual void Unlock ()=0; +}; + +#include "unix_mutex.h" +typedef FastOS_UNIX_Mutex FASTOS_PREFIX(Mutex); + diff --git a/fastos/src/vespa/fastos/ringbuffer.h b/fastos/src/vespa/fastos/ringbuffer.h index 41c0af7385b..53ee003915e 100644 --- a/fastos/src/vespa/fastos/ringbuffer.h +++ b/fastos/src/vespa/fastos/ringbuffer.h @@ -32,7 +32,7 @@ private: return (_dataIndex + offset) % _bufferSize; } - std::mutex _mutex; + FastOS_Mutex _mutex; public: void Reset () @@ -128,6 +128,14 @@ public: return _closed; } - std::unique_lock<std::mutex> getGuard() { return std::unique_lock<std::mutex>(_mutex); } + void Lock () + { + _mutex.Lock(); + } + + void Unlock () + { + _mutex.Unlock(); + } }; diff --git a/fastos/src/vespa/fastos/socketevent.cpp b/fastos/src/vespa/fastos/socketevent.cpp index 5e542390a53..a80cb015782 100644 --- a/fastos/src/vespa/fastos/socketevent.cpp +++ b/fastos/src/vespa/fastos/socketevent.cpp @@ -7,7 +7,7 @@ FastOS_SocketEventObjects *FastOS_SocketEventObjects::_objects = nullptr; -std::mutex FastOS_SocketEventObjects::_listMutex; +FastOS_Mutex FastOS_SocketEventObjects::_listMutex; int FastOS_SocketEventObjects::_objectCount = 0; bool FastOS_SocketEventObjects::_initialized = false; @@ -55,12 +55,12 @@ bool FastOS_SocketEvent::HandleWakeUp () FastOS_SocketEventObjects *FastOS_SocketEventObjects::ObtainObject (FastOS_SocketEvent *event) { FastOS_SocketEventObjects *node; - std::unique_lock<std::mutex> guard(_listMutex); + _listMutex.Lock(); if(_objects == nullptr) { _objectCount++; - guard.unlock(); + _listMutex.Unlock(); node = new FastOS_SocketEventObjects(event); node->_next = nullptr; @@ -70,6 +70,8 @@ FastOS_SocketEventObjects *FastOS_SocketEventObjects::ObtainObject (FastOS_Socke node = _objects; _objects = node->_next; node->_next = nullptr; + + _listMutex.Unlock(); } return node; @@ -79,7 +81,7 @@ void FastOS_SocketEventObjects::ReleaseObject (FastOS_SocketEventObjects *node) { if (node != nullptr) node->ReleasedCleanup(); - std::lock_guard<std::mutex> guard(_listMutex); + _listMutex.Lock(); if (_initialized) { node->_next = _objects; @@ -88,6 +90,8 @@ void FastOS_SocketEventObjects::ReleaseObject (FastOS_SocketEventObjects *node) delete node; _objectCount--; } + + _listMutex.Unlock(); } @@ -209,14 +213,15 @@ FastOS_SocketEvent::epollFini() void FastOS_SocketEventObjects::InitializeClass(void) { - std::lock_guard<std::mutex> guard(_listMutex); + _listMutex.Lock(); _initialized = true; + _listMutex.Unlock(); } void FastOS_SocketEventObjects::ClassCleanup(void) { - std::lock_guard<std::mutex> guard(_listMutex); + _listMutex.Lock(); _initialized = false; for (;;) { @@ -231,6 +236,7 @@ void FastOS_SocketEventObjects::ClassCleanup(void) _objectCount--; } } + _listMutex.Unlock(); } diff --git a/fastos/src/vespa/fastos/socketevent.h b/fastos/src/vespa/fastos/socketevent.h index 267f948caf9..5e457908ace 100644 --- a/fastos/src/vespa/fastos/socketevent.h +++ b/fastos/src/vespa/fastos/socketevent.h @@ -3,11 +3,11 @@ #pragma once #include "types.h" +#include "mutex.h" #include <poll.h> #include <sys/epoll.h> #include <vector> -#include <mutex> class FastOS_IOEvent { @@ -25,7 +25,7 @@ private: FastOS_SocketEventObjects(const FastOS_SocketEventObjects&); FastOS_SocketEventObjects& operator=(const FastOS_SocketEventObjects&); - static std::mutex _listMutex; + static FastOS_Mutex _listMutex; static int _objectCount; static bool _initialized; diff --git a/fastos/src/vespa/fastos/thread.cpp b/fastos/src/vespa/fastos/thread.cpp index 5e3400b70e3..3cd3bb4b85b 100644 --- a/fastos/src/vespa/fastos/thread.cpp +++ b/fastos/src/vespa/fastos/thread.cpp @@ -20,7 +20,6 @@ FastOS_ThreadPool::FastOS_ThreadPool(int stackSize, int maxThreads) _stackSize(stackSize), _closeCalledFlag(false), _freeMutex(), - _liveMutex(), _liveCond(), _freeThreads(nullptr), _activeThreads(nullptr), @@ -41,20 +40,21 @@ void FastOS_ThreadPool::ThreadIsAboutToTerminate(FastOS_ThreadInterface *) { assert(isClosed()); - std::lock_guard<std::mutex> guard(_liveMutex); + _liveCond.Lock(); _numTerminated++; _numLive--; - if (_numLive == 0) { - _liveCond.notify_all(); - } + if (_numLive == 0) + _liveCond.Broadcast(); + + _liveCond.Unlock(); } // This is a NOP if the thread isn't active. void FastOS_ThreadPool::FreeThread (FastOS_ThreadInterface *thread) { - std::lock_guard<std::mutex> guard(_freeMutex); + _freeMutex.Lock(); if(thread->_active) { LinkOutThread(thread, &_activeThreads); @@ -65,6 +65,8 @@ void FastOS_ThreadPool::FreeThread (FastOS_ThreadInterface *thread) LinkInThread(thread, &_freeThreads); _numFree++; } + + _freeMutex.Unlock(); } void FastOS_ThreadPool::LinkOutThread (FastOS_ThreadInterface *thread, FastOS_ThreadInterface **listHead) @@ -108,7 +110,7 @@ FastOS_ThreadInterface *FastOS_ThreadPool::NewThread (FastOS_Runnable *owner, vo { FastOS_ThreadInterface *thread=nullptr; - std::unique_lock<std::mutex> freeGuard(_freeMutex); + _freeMutex.Lock(); if (!isClosed()) { if ((thread = _freeThreads) != nullptr) { @@ -124,21 +126,24 @@ FastOS_ThreadInterface *FastOS_ThreadPool::NewThread (FastOS_Runnable *owner, vo fprintf(stderr, "Error: Maximum number of threads (%d)" " already allocated.\n", _maxThreads); } else { - freeGuard.unlock(); - { - std::lock_guard<std::mutex> liveGuard(_liveMutex); - _numLive++; - } + _freeMutex.Unlock(); + + _liveCond.Lock(); + _numLive++; + _liveCond.Unlock(); + thread = FastOS_Thread::CreateThread(this); if (thread == nullptr) { - std::lock_guard<std::mutex> liveGuard(_liveMutex); + _liveCond.Lock(); _numLive--; if (_numLive == 0) { - _liveCond.notify_all(); + _liveCond.Broadcast(); } + _liveCond.Unlock(); } - freeGuard.lock(); + + _freeMutex.Lock(); if(thread != nullptr) ActivateThread(thread); @@ -146,10 +151,11 @@ FastOS_ThreadInterface *FastOS_ThreadPool::NewThread (FastOS_Runnable *owner, vo } } - freeGuard.unlock(); + _freeMutex.Unlock(); if(thread != nullptr) { - std::lock_guard<std::mutex> liveGuard(_liveMutex); + _liveCond.Lock(); thread->Dispatch(owner, arg); + _liveCond.Unlock(); } return thread; @@ -160,7 +166,7 @@ void FastOS_ThreadPool::BreakThreads () { FastOS_ThreadInterface *thread; - std::lock_guard<std::mutex> freeGuard(_freeMutex); + _freeMutex.Lock(); // Notice all active threads that they should quit for(thread=_activeThreads; thread != nullptr; thread=thread->_next) { @@ -171,22 +177,26 @@ void FastOS_ThreadPool::BreakThreads () for(thread=_freeThreads; thread != nullptr; thread=thread->_next) { thread->SetBreakFlag(); } + + _freeMutex.Unlock(); } void FastOS_ThreadPool::JoinThreads () { - std::unique_lock<std::mutex> liveGuard(_liveMutex); - while (_numLive > 0) { - _liveCond.wait(liveGuard); - } + _liveCond.Lock(); + + while (_numLive > 0) + _liveCond.Wait(); + + _liveCond.Unlock(); } void FastOS_ThreadPool::DeleteThreads () { FastOS_ThreadInterface *thread; - std::lock_guard<std::mutex> freeGuard(_freeMutex); + _freeMutex.Lock(); assert(_numActive == 0); assert(_numLive == 0); @@ -199,25 +209,30 @@ void FastOS_ThreadPool::DeleteThreads () } assert(_numFree == 0); + + _freeMutex.Unlock(); } void FastOS_ThreadPool::Close () { - std::unique_lock<std::mutex> closeFlagGuard(_closeFlagMutex); + _closeFlagMutex.Lock(); if (!_closeCalledFlag) { _closeCalledFlag = true; - closeFlagGuard.unlock(); + _closeFlagMutex.Unlock(); BreakThreads(); JoinThreads(); DeleteThreads(); + } else { + _closeFlagMutex.Unlock(); } } bool FastOS_ThreadPool::isClosed() { - std::lock_guard<std::mutex> closeFlagGuard(_closeFlagMutex); + _closeFlagMutex.Lock(); bool closed(_closeCalledFlag); + _closeFlagMutex.Unlock(); return closed; } @@ -247,19 +262,20 @@ void FastOS_ThreadInterface::Hook () while(!finished) { - std::unique_lock<std::mutex> dispatchedGuard(_dispatchedMutex); // BEGIN lock + _dispatched.Lock(); // BEGIN lock + while (_owner == nullptr && !(finished = _pool->isClosed())) { - _dispatchedCond.wait(dispatchedGuard); + _dispatched.Wait(); } - dispatchedGuard.unlock(); // END lock + _dispatched.Unlock(); // END lock if(!finished) { PreEntry(); deleteOnCompletion = _owner->DeleteOnCompletion(); _owner->Run(this, _startArg); - dispatchedGuard.lock(); // BEGIN lock + _dispatched.Lock(); // BEGIN lock if (deleteOnCompletion) { delete _owner; @@ -269,13 +285,9 @@ void FastOS_ThreadInterface::Hook () _breakFlag = false; finished = _pool->isClosed(); - dispatchedGuard.unlock(); // END lock + _dispatched.Unlock(); // END lock - { - std::lock_guard<std::mutex> runningGuard(_runningMutex); - _runningFlag = false; - _runningCond.notify_all(); - } + _runningCond.ClearBusyBroadcast(); _pool->FreeThread(this); // printf("Thread given back to FastOS_ThreadPool: %p\n", this); @@ -295,15 +307,9 @@ void FastOS_ThreadInterface::Hook () void FastOS_ThreadInterface::Dispatch(FastOS_Runnable *newOwner, void *arg) { - std::lock_guard<std::mutex> dispatchedGuard(_dispatchedMutex); + _dispatched.Lock(); - { - std::unique_lock<std::mutex> runningGuard(_runningMutex); - while (_runningFlag) { - _runningCond.wait(runningGuard); - } - _runningFlag = true; - } + _runningCond.SetBusy(); _owner = newOwner; _startArg = arg; @@ -316,14 +322,18 @@ void FastOS_ThreadInterface::Dispatch(FastOS_Runnable *newOwner, void *arg) // it the safe way we just do that, instead of keeping a unneccessary long // suppressionslist. It will be long enough anyway. - _dispatchedCond.notify_one(); + _dispatched.Signal(); + + _dispatched.Unlock(); } void FastOS_ThreadInterface::SetBreakFlag() { - std::lock_guard<std::mutex> dispatchedGuard(_dispatchedMutex); + _dispatched.Lock(); _breakFlag = true; - _dispatchedCond.notify_one(); + + _dispatched.Signal(); + _dispatched.Unlock(); } @@ -341,10 +351,7 @@ FastOS_ThreadInterface *FastOS_ThreadInterface::CreateThread(FastOS_ThreadPool * void FastOS_ThreadInterface::Join () { - std::unique_lock<std::mutex> runningGuard(_runningMutex); - while (_runningFlag) { - _runningCond.wait(runningGuard); - } + _runningCond.WaitBusy(); } diff --git a/fastos/src/vespa/fastos/thread.h b/fastos/src/vespa/fastos/thread.h index 2726efe3cf0..eb43fc6b664 100644 --- a/fastos/src/vespa/fastos/thread.h +++ b/fastos/src/vespa/fastos/thread.h @@ -12,8 +12,8 @@ #include "types.h" -#include <mutex> -#include <condition_variable> +#include "mutex.h" +#include "cond.h" typedef pthread_t FastOS_ThreadId; @@ -41,7 +41,7 @@ private: FastOS_ThreadPool& operator=(const FastOS_ThreadPool&); int _startedThreadsCount; - std::mutex _closeFlagMutex; + FastOS_Mutex _closeFlagMutex; /** * The stack size for threads in this pool. */ @@ -49,9 +49,8 @@ private: bool _closeCalledFlag; // Always lock in this order - std::mutex _freeMutex; - std::mutex _liveMutex; - std::condition_variable _liveCond; + FastOS_Mutex _freeMutex; + FastOS_Cond _liveCond; /** * List of free (available) threads. */ @@ -233,8 +232,7 @@ protected: * The thread does not start (call @ref FastOS_Runnable::Run()) * until this event has been triggered. */ - std::mutex _dispatchedMutex; - std::condition_variable _dispatchedCond; + FastOS_Cond _dispatched; FastOS_ThreadInterface *_next; FastOS_ThreadInterface *_prev; @@ -305,9 +303,7 @@ protected: * Is the thread running? This is used by @ref Join(), to wait for threads * to finish. */ - std::mutex _runningMutex; - std::condition_variable _runningCond; - bool _runningFlag; + FastOS_BoolCond _runningCond; public: /** @@ -328,8 +324,7 @@ public: * Constructor. Resets internal attributes. */ FastOS_ThreadInterface (FastOS_ThreadPool *pool) - : _dispatchedMutex(), - _dispatchedCond(), + : _dispatched(), _next(nullptr), _prev(nullptr), _owner(nullptr), @@ -337,9 +332,7 @@ public: _startArg(nullptr), _breakFlag(false), _active(false), - _runningMutex(), - _runningCond(), - _runningFlag(false) + _runningCond() { } diff --git a/fastos/src/vespa/fastos/unix_app.cpp b/fastos/src/vespa/fastos/unix_app.cpp index c60035aa5ab..7682b2d5b8f 100644 --- a/fastos/src/vespa/fastos/unix_app.cpp +++ b/fastos/src/vespa/fastos/unix_app.cpp @@ -162,13 +162,9 @@ void FastOS_UNIX_Application::Cleanup () _ipcHelper->Exit(); if (_processStarter != nullptr) { - { - std::unique_lock<std::mutex> guard; - if (_processListMutex) { - guard = getProcessGuard(); - } - _processStarter->Stop(); - } + if (_processListMutex) ProcessLock(); + _processStarter->Stop(); + if (_processListMutex) ProcessUnlock(); delete _processStarter; _processStarter = nullptr; } diff --git a/fastos/src/vespa/fastos/unix_cond.cpp b/fastos/src/vespa/fastos/unix_cond.cpp new file mode 100644 index 00000000000..5eb1f5b0218 --- /dev/null +++ b/fastos/src/vespa/fastos/unix_cond.cpp @@ -0,0 +1,49 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "cond.h" +#include <sys/time.h> +#include <cstdint> + +FastOS_UNIX_Cond::FastOS_UNIX_Cond(void) + : FastOS_CondInterface(), + _cond() +{ + pthread_cond_init(&_cond, nullptr); +} + +FastOS_UNIX_Cond::~FastOS_UNIX_Cond(void) +{ + pthread_cond_destroy(&_cond); +} + +void +FastOS_UNIX_Cond::Wait(void) +{ + pthread_cond_wait(&_cond, &_mutex); +} + +bool +FastOS_UNIX_Cond::TimedWait(int milliseconds) +{ + + struct timeval currentTime; + struct timespec absTime; + int error; + + gettimeofday(¤tTime, nullptr); + + int64_t ns = (static_cast<int64_t>(currentTime.tv_sec) * + static_cast<int64_t>(1000 * 1000 * 1000) + + static_cast<int64_t>(currentTime.tv_usec) * + static_cast<int64_t>(1000) + + static_cast<int64_t>(milliseconds) * + static_cast<int64_t>(1000 * 1000)); + + absTime.tv_sec = static_cast<int> + (ns / static_cast<int64_t>(1000 * 1000 * 1000)); + absTime.tv_nsec = static_cast<int> + (ns % static_cast<int64_t>(1000 * 1000 * 1000)); + + error = pthread_cond_timedwait(&_cond, &_mutex, &absTime); + return error == 0; +} diff --git a/fastos/src/vespa/fastos/unix_cond.h b/fastos/src/vespa/fastos/unix_cond.h new file mode 100644 index 00000000000..7367d812959 --- /dev/null +++ b/fastos/src/vespa/fastos/unix_cond.h @@ -0,0 +1,42 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +//************************************************************************ +/** + * Class definition and implementation for FastOS_UNIX_Cond. + * + * @author Div, Oivind H. Danielsen + */ + +#pragma once + +#include <vespa/fastos/cond.h> + + +class FastOS_UNIX_Cond : public FastOS_CondInterface +{ +private: + FastOS_UNIX_Cond(const FastOS_UNIX_Cond &); + FastOS_UNIX_Cond& operator=(const FastOS_UNIX_Cond &); + + pthread_cond_t _cond; + +public: + FastOS_UNIX_Cond (); + + ~FastOS_UNIX_Cond(); + + void Wait() override; + + bool TimedWait(int milliseconds) override; + + void Signal() override + { + pthread_cond_signal(&_cond); + } + + void Broadcast() override + { + pthread_cond_broadcast(&_cond); + } +}; + + diff --git a/fastos/src/vespa/fastos/unix_ipc.cpp b/fastos/src/vespa/fastos/unix_ipc.cpp index 79fbe3ee076..695d395674f 100644 --- a/fastos/src/vespa/fastos/unix_ipc.cpp +++ b/fastos/src/vespa/fastos/unix_ipc.cpp @@ -5,8 +5,6 @@ #include <cstring> #include <unistd.h> #include <fcntl.h> -#include <memory> -#include <future> FastOS_UNIX_IPCHelper:: FastOS_UNIX_IPCHelper (FastOS_ApplicationInterface *app, int descriptor) @@ -57,7 +55,7 @@ DoWrite(FastOS_UNIX_Process::DescriptorHandle &desc) bool rc = true; FastOS_RingBuffer *buffer = desc._writeBuffer.get(); - auto bufferGuard = buffer->getGuard(); + buffer->Lock(); int writeBytes = buffer->GetReadSpace(); if(writeBytes > 0) { @@ -80,6 +78,8 @@ DoWrite(FastOS_UNIX_Process::DescriptorHandle &desc) else if(bytesWritten == 0) desc.CloseHandle(); } + buffer->Unlock(); + return rc; } @@ -90,7 +90,7 @@ DoRead (FastOS_UNIX_Process::DescriptorHandle &desc) FastOS_RingBuffer *buffer = desc._readBuffer.get(); - auto bufferGuard = buffer->getGuard(); + buffer->Lock(); int readBytes = buffer->GetWriteSpace(); if(readBytes > 0) { int bytesRead; @@ -108,6 +108,7 @@ DoRead (FastOS_UNIX_Process::DescriptorHandle &desc) desc.CloseHandle(); } } + buffer->Unlock(); return rc; } @@ -429,7 +430,8 @@ RemoveClosingProcesses(void) if(!stillBusy) { - if (xproc->_closing) { + if(xproc->_closing != nullptr) + { // We already have the process lock at this point, // so modifying the list is safe. _app->RemoveChildProcess(node); @@ -448,8 +450,7 @@ RemoveClosingProcesses(void) } // The process destructor can now proceed - auto closingPromise(std::move(xproc->_closing)); - closingPromise->set_value(); + xproc->_closing->ClearBusy(); } } } @@ -473,32 +474,31 @@ Run(FastOS_ThreadInterface *thisThread, void *arg) for(;;) { // Deliver messages to from child processes and parent. + _app->ProcessLock(); + for(node = _app->GetProcessList(); node != nullptr; node = node->_next) { - auto guard = _app->getProcessGuard(); - for(node = _app->GetProcessList(); node != nullptr; node = node->_next) - { - FastOS_UNIX_Process *xproc = static_cast<FastOS_UNIX_Process *>(node); - FastOS_UNIX_Process::DescriptorHandle &desc = - xproc->GetDescriptorHandle(FastOS_UNIX_Process::TYPE_IPC); - DeliverMessages(desc._readBuffer.get()); - PipeData(xproc, FastOS_UNIX_Process::TYPE_STDOUT); - PipeData(xproc, FastOS_UNIX_Process::TYPE_STDERR); - } - DeliverMessages(_appParentIPCDescriptor._readBuffer.get()); + FastOS_UNIX_Process *xproc = static_cast<FastOS_UNIX_Process *>(node); + FastOS_UNIX_Process::DescriptorHandle &desc = + xproc->GetDescriptorHandle(FastOS_UNIX_Process::TYPE_IPC); + DeliverMessages(desc._readBuffer.get()); + PipeData(xproc, FastOS_UNIX_Process::TYPE_STDOUT); + PipeData(xproc, FastOS_UNIX_Process::TYPE_STDERR); + } + DeliverMessages(_appParentIPCDescriptor._readBuffer.get()); - // Setup file descriptor sets for the next select() call - BuildPollChecks(); + // Setup file descriptor sets for the next select() call + BuildPollChecks(); - // Close and signal closing processes - RemoveClosingProcesses(); + // Close and signal closing processes + RemoveClosingProcesses(); - BuildPollArray(&fds, &nfds, &allocnfds); - } - bool exitFlag = false; - { - std::lock_guard<std::mutex> guard(_lock); - exitFlag = _exitFlag; - } + BuildPollArray(&fds, &nfds, &allocnfds); + + _app->ProcessUnlock(); + + _lock.Lock(); + bool exitFlag(_exitFlag); + _lock.Unlock(); if (exitFlag) { if (_appParentIPCDescriptor._fd != -1) @@ -546,13 +546,11 @@ Run(FastOS_ThreadInterface *thisThread, void *arg) break; } - bool woken = false; - { - auto guard = _app->getProcessGuard(); - woken = SavePollArray(fds, nfds); - // Do actual IO (based on file descriptor sets and buffer contents) - PerformAsyncIO(); - } + _app->ProcessLock(); + bool woken = SavePollArray(fds, nfds); + // Do actual IO (based on file descriptor sets and buffer contents) + PerformAsyncIO(); + _app->ProcessUnlock(); PerformAsyncIPCIO(); // Did someone want to wake us up from the poll() call? @@ -586,7 +584,7 @@ SendMessage (FastOS_UNIX_Process *xproc, const void *buffer, ipcBuffer = desc._writeBuffer.get(); if(ipcBuffer != nullptr) { - auto ipcBufferGuard = ipcBuffer->getGuard(); + ipcBuffer->Lock(); if(ipcBuffer->GetWriteSpace() >= int((length + sizeof(int)))) { memcpy(ipcBuffer->GetWritePtr(), &length, sizeof(int)); @@ -597,6 +595,7 @@ SendMessage (FastOS_UNIX_Process *xproc, const void *buffer, NotifyProcessListChange(); rc = true; } + ipcBuffer->Unlock(); } return rc; } @@ -612,9 +611,10 @@ void FastOS_UNIX_IPCHelper::NotifyProcessListChange () void FastOS_UNIX_IPCHelper::Exit () { - std::lock_guard<std::mutex> guard(_lock); + _lock.Lock(); _exitFlag = true; NotifyProcessListChange(); + _lock.Unlock(); } void FastOS_UNIX_IPCHelper::AddProcess (FastOS_UNIX_Process *xproc) @@ -639,11 +639,16 @@ void FastOS_UNIX_IPCHelper::AddProcess (FastOS_UNIX_Process *xproc) void FastOS_UNIX_IPCHelper::RemoveProcess (FastOS_UNIX_Process *xproc) { - auto closePromise = std::make_unique<std::promise<void>>(); - auto closeFuture = closePromise->get_future(); - xproc->_closing = std::move(closePromise); + (void)xproc; + + FastOS_BoolCond closeWait; + + closeWait.SetBusy(); + xproc->_closing = &closeWait; + NotifyProcessListChange(); - closeFuture.wait(); + + closeWait.WaitBusy(); } void FastOS_UNIX_IPCHelper::DeliverMessages (FastOS_RingBuffer *buffer) @@ -651,7 +656,7 @@ void FastOS_UNIX_IPCHelper::DeliverMessages (FastOS_RingBuffer *buffer) if(buffer == nullptr) return; - auto bufferGuard = buffer->getGuard(); + buffer->Lock(); unsigned int readSpace; while((readSpace = buffer->GetReadSpace()) > sizeof(int)) @@ -668,6 +673,8 @@ void FastOS_UNIX_IPCHelper::DeliverMessages (FastOS_RingBuffer *buffer) else break; } + + buffer->Unlock(); } void FastOS_UNIX_IPCHelper:: @@ -683,7 +690,7 @@ PipeData (FastOS_UNIX_Process *process, if(listener == nullptr) return; - auto bufferGuard = buffer->getGuard(); + buffer->Lock(); unsigned int readSpace; while((readSpace = buffer->GetReadSpace()) > 0) { @@ -693,4 +700,6 @@ PipeData (FastOS_UNIX_Process *process, if(buffer->GetCloseFlag()) process->CloseListener(type); + + buffer->Unlock(); } diff --git a/fastos/src/vespa/fastos/unix_ipc.h b/fastos/src/vespa/fastos/unix_ipc.h index 218096e2145..35e77e11cb2 100644 --- a/fastos/src/vespa/fastos/unix_ipc.h +++ b/fastos/src/vespa/fastos/unix_ipc.h @@ -13,7 +13,7 @@ private: FastOS_UNIX_IPCHelper& operator=(const FastOS_UNIX_IPCHelper&); protected: - std::mutex _lock; + FastOS_Mutex _lock; volatile bool _exitFlag; FastOS_ApplicationInterface *_app; diff --git a/fastos/src/vespa/fastos/unix_mutex.cpp b/fastos/src/vespa/fastos/unix_mutex.cpp new file mode 100644 index 00000000000..535a39ce592 --- /dev/null +++ b/fastos/src/vespa/fastos/unix_mutex.cpp @@ -0,0 +1,18 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "mutex.h" +#include <cassert> + +FastOS_UNIX_Mutex::FastOS_UNIX_Mutex(void) + : FastOS_MutexInterface(), + _mutex() +{ + int error = pthread_mutex_init(&_mutex, nullptr); + assert(error == 0); + (void) error; +} + +FastOS_UNIX_Mutex::~FastOS_UNIX_Mutex(void) +{ + pthread_mutex_destroy(&_mutex); +} diff --git a/fastos/src/vespa/fastos/unix_mutex.h b/fastos/src/vespa/fastos/unix_mutex.h new file mode 100644 index 00000000000..30150bc1590 --- /dev/null +++ b/fastos/src/vespa/fastos/unix_mutex.h @@ -0,0 +1,44 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** +****************************************************************************** +* @author Oivind H. Danielsen +* @date Creation date: 2000-02-02 +* @file +* Class definition and implementation for FastOS_UNIX_Mutex +*****************************************************************************/ + + + +#pragma once + + +#include "mutex.h" +#include <pthread.h> + +class FastOS_UNIX_Mutex : public FastOS_MutexInterface +{ +private: + FastOS_UNIX_Mutex(const FastOS_UNIX_Mutex &other); + FastOS_UNIX_Mutex & operator = (const FastOS_UNIX_Mutex &other); +protected: + pthread_mutex_t _mutex; + +public: + FastOS_UNIX_Mutex(); + + ~FastOS_UNIX_Mutex(); + + bool TryLock () override { + return pthread_mutex_trylock(&_mutex) == 0; + } + + void Lock() override { + pthread_mutex_lock(&_mutex); + } + + void Unlock() override { + pthread_mutex_unlock(&_mutex); + } +}; + + diff --git a/fastos/src/vespa/fastos/unix_process.cpp b/fastos/src/vespa/fastos/unix_process.cpp index 80ad0605f78..df32cb935ff 100644 --- a/fastos/src/vespa/fastos/unix_process.cpp +++ b/fastos/src/vespa/fastos/unix_process.cpp @@ -805,10 +805,9 @@ FastOS_UNIX_Process (const char *cmdLine, bool pipeStdin, if (stderrListener != nullptr) _descriptor[TYPE_STDERR]._readBuffer.reset(new FastOS_RingBuffer(bufferSize)); - { - auto guard = _app->getProcessGuard(); - _app->AddChildProcess(this); - } + _app->ProcessLock(); + _app->AddChildProcess(this); + _app->ProcessUnlock(); // App::AddToIPCComm() is performed when the process is started } @@ -826,8 +825,9 @@ FastOS_UNIX_Process::~FastOS_UNIX_Process () static_cast<FastOS_UNIX_Application *>(_app)->RemoveFromIPCComm(this); } else { // No IPC descriptor, do it ourselves - auto guard = _app->getProcessGuard(); + _app->ProcessLock(); _app->RemoveChildProcess(this); + _app->ProcessUnlock(); } for(int i=0; i<int(TYPE_COUNT); i++) { @@ -897,7 +897,7 @@ bool FastOS_UNIX_Process::Signal(int sig) bool rc = false; pid_t pid; - auto guard = _app->getProcessGuard(); + _app->ProcessLock(); pid = GetProcessId(); if (pid == 0) { /* Do nothing */ @@ -908,6 +908,7 @@ bool FastOS_UNIX_Process::Signal(int sig) _killed = true; rc = true; } + _app->ProcessUnlock(); return rc; } @@ -1721,7 +1722,7 @@ CreateProcess (FastOS_UNIX_Process *process, const char *cmdLine = process->GetCommandLine(); - auto guard = _app->getProcessGuard(); + process->_app->ProcessLock(); if (process->GetDirectChild()) { _hasDirectChildren = true; @@ -1769,7 +1770,7 @@ CreateProcess (FastOS_UNIX_Process *process, "Forkandexec %s failed\n", cmdLine); } - guard.unlock(); + process->_app->ProcessUnlock(); delete rprocess; FreeEnvironmentVariables(env); return rc; @@ -1846,6 +1847,8 @@ CreateProcess (FastOS_UNIX_Process *process, } } } + process->_app->ProcessUnlock(); + return rc; } @@ -1923,13 +1926,13 @@ FastOS_UNIX_ProcessStarter::Wait(FastOS_UNIX_Process *process, *pollStillRunning = true; for (;;) { - { - auto guard = process->_app->getProcessGuard(); + process->_app->ProcessLock(); - if (_hasDirectChildren) PollReapDirectChildren(); + if (_hasDirectChildren) PollReapDirectChildren(); - if (_hasProxiedChildren) PollReapProxiedChildren(); - } + if (_hasProxiedChildren) PollReapProxiedChildren(); + + process->_app->ProcessUnlock(); if (process->GetDeathFlag()) { if (pollStillRunning != nullptr) @@ -1968,14 +1971,16 @@ bool FastOS_UNIX_ProcessStarter::Detach(FastOS_UNIX_Process *process) bool rc = true; pid_t pid; - auto guard = process->_app->getProcessGuard(); + process->_app->ProcessLock(); pid = process->GetProcessId(); if (pid == 0) { + process->_app->ProcessUnlock(); return false; // Cannot detach nonstarted process. } if (process->GetDeathFlag()) { + process->_app->ProcessUnlock(); return true; } @@ -1999,6 +2004,7 @@ bool FastOS_UNIX_ProcessStarter::Detach(FastOS_UNIX_Process *process) ReadBytes(_mainSocket, &returnCode, sizeof(int)); process->DeathNotification(returnCode); } + process->_app->ProcessUnlock(); return rc; } @@ -2038,4 +2044,4 @@ FastOS_UNIX_Process::DescriptorHandle::CloseHandleDirectChild() close(_fd); _fd = -1; } -} +}
\ No newline at end of file diff --git a/fastos/src/vespa/fastos/unix_process.h b/fastos/src/vespa/fastos/unix_process.h index bff5a1d276e..16614deb1a2 100644 --- a/fastos/src/vespa/fastos/unix_process.h +++ b/fastos/src/vespa/fastos/unix_process.h @@ -12,8 +12,8 @@ #include "app.h" #include <string> #include <memory> -#include <future> +class FastOS_BoolCond; class FastOS_UNIX_RealProcess; class FastOS_RingBuffer; @@ -78,7 +78,7 @@ public: { TYPE_READCOUNT = 3 }; - std::unique_ptr<std::promise<void>> _closing; + FastOS_BoolCond *_closing; FastOS_ProcessRedirectListener *GetListener (DescriptorType type) { if(type == TYPE_STDOUT) |