diff options
author | Tor Egge <Tor.Egge@oath.com> | 2017-10-30 13:57:16 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@oath.com> | 2017-10-30 13:57:16 +0000 |
commit | 7ae7ce66cadb958f6d722bf8bcb4e06586cbe9c4 (patch) | |
tree | 08271a6dc33a9a3d2cbb1adb53f234f2937ebbaf /fastos/src | |
parent | 1ec2316f01708cb6daf3b5d943ca4d37c8e13854 (diff) |
Use std::mutex and std::condition_variable instead of FastOS_Mutex and
FastOS_Condition.
Diffstat (limited to 'fastos/src')
-rw-r--r-- | fastos/src/tests/job.h | 3 | ||||
-rw-r--r-- | fastos/src/tests/processtest.cpp | 23 | ||||
-rw-r--r-- | fastos/src/tests/typetest.cpp | 3 | ||||
-rw-r--r-- | fastos/src/vespa/fastos/thread.cpp | 109 | ||||
-rw-r--r-- | fastos/src/vespa/fastos/thread.h | 25 | ||||
-rw-r--r-- | fastos/src/vespa/fastos/unix_ipc.cpp | 21 | ||||
-rw-r--r-- | fastos/src/vespa/fastos/unix_process.h | 4 |
7 files changed, 91 insertions, 97 deletions
diff --git a/fastos/src/tests/job.h b/fastos/src/tests/job.h index a5b84fa0f9c..ede0f702627 100644 --- a/fastos/src/tests/job.h +++ b/fastos/src/tests/job.h @@ -2,6 +2,9 @@ #pragma once +#include <vespa/fastos/mutex.h> +#include <vespa/fastos/cond.h> + enum JobCode { PRINT_MESSAGE_AND_WAIT3SEC, diff --git a/fastos/src/tests/processtest.cpp b/fastos/src/tests/processtest.cpp index 11e1307027d..cd6839fd0aa 100644 --- a/fastos/src/tests/processtest.cpp +++ b/fastos/src/tests/processtest.cpp @@ -17,15 +17,14 @@ public: static int _allocCount; static int _successCount; static int _failCount; - static FastOS_Mutex *_counterLock; + static std::mutex *_counterLock; MyListener (const char *title) : _title(title), _receivedBytes(0) { - _counterLock->Lock(); - _allocCount++; - _counterLock->Unlock(); + std::lock_guard<std::mutex> guard(*_counterLock); + _allocCount++; } virtual ~MyListener () @@ -34,14 +33,13 @@ public: const int correctByteCount = 16; - _counterLock->Lock(); + std::lock_guard<std::mutex> guard(*_counterLock); if(_receivedBytes == (isStdout ? correctByteCount : 0)) _successCount++; else _failCount++; _allocCount--; - _counterLock->Unlock(); } void OnReceiveData (const void *data, size_t length) override @@ -62,7 +60,7 @@ public: int MyListener::_allocCount = 0; int MyListener::_successCount = 0; int MyListener::_failCount = 0; -FastOS_Mutex *MyListener::_counterLock = nullptr; +std::mutex *MyListener::_counterLock = nullptr; class ThreadRunJob : public FastOS_Runnable @@ -122,7 +120,7 @@ private: // or not. bool _gotMessage; int _receivedMessages; - FastOS_Mutex *_counterLock; + std::mutex *_counterLock; bool _isChild; public: ProcessTest () @@ -156,9 +154,8 @@ public: // We only have the counter lock if we are the parent process. if(_counterLock != nullptr) { - _counterLock->Lock(); - _receivedMessages++; - _counterLock->Unlock(); + std::lock_guard<std::mutex> guard(*_counterLock); + _receivedMessages++; } } @@ -219,7 +216,7 @@ public: const int numLoops = 100; const int numEachTime = 40; - MyListener::_counterLock = new FastOS_Mutex(); + MyListener::_counterLock = new std::mutex; char testHeader[200]; strcpy(testHeader, "Process Test"); @@ -381,7 +378,7 @@ public: TestHeader ("IPC Test"); const char *childProgram = _argv[1]; - _counterLock = new FastOS_Mutex(); + _counterLock = new std::mutex; int i; for(i=0; i<30; i++) diff --git a/fastos/src/tests/typetest.cpp b/fastos/src/tests/typetest.cpp index 209af305501..503c9a30d24 100644 --- a/fastos/src/tests/typetest.cpp +++ b/fastos/src/tests/typetest.cpp @@ -16,11 +16,8 @@ private: TestHeader("Object Sizes (bytes)"); Progress(true, "FastOS_Application: %d", sizeof(FastOS_Application)); - Progress(true, "FastOS_BoolCond %d", sizeof(FastOS_BoolCond)); - Progress(true, "FastOS_Cond %d", sizeof(FastOS_Cond)); Progress(true, "FastOS_DirectoryScan %d", sizeof(FastOS_DirectoryScan)); Progress(true, "FastOS_File: %d", sizeof(FastOS_File)); - Progress(true, "FastOS_Mutex: %d", sizeof(FastOS_Mutex)); Progress(true, "FastOS_Runnable %d", sizeof(FastOS_Runnable)); Progress(true, "FastOS_ServerSocket %d", sizeof(FastOS_ServerSocket)); Progress(true, "FastOS_Socket: %d", sizeof(FastOS_Socket)); diff --git a/fastos/src/vespa/fastos/thread.cpp b/fastos/src/vespa/fastos/thread.cpp index 3cd3bb4b85b..5e3400b70e3 100644 --- a/fastos/src/vespa/fastos/thread.cpp +++ b/fastos/src/vespa/fastos/thread.cpp @@ -20,6 +20,7 @@ FastOS_ThreadPool::FastOS_ThreadPool(int stackSize, int maxThreads) _stackSize(stackSize), _closeCalledFlag(false), _freeMutex(), + _liveMutex(), _liveCond(), _freeThreads(nullptr), _activeThreads(nullptr), @@ -40,21 +41,20 @@ void FastOS_ThreadPool::ThreadIsAboutToTerminate(FastOS_ThreadInterface *) { assert(isClosed()); - _liveCond.Lock(); + std::lock_guard<std::mutex> guard(_liveMutex); _numTerminated++; _numLive--; - if (_numLive == 0) - _liveCond.Broadcast(); - - _liveCond.Unlock(); + if (_numLive == 0) { + _liveCond.notify_all(); + } } // This is a NOP if the thread isn't active. void FastOS_ThreadPool::FreeThread (FastOS_ThreadInterface *thread) { - _freeMutex.Lock(); + std::lock_guard<std::mutex> guard(_freeMutex); if(thread->_active) { LinkOutThread(thread, &_activeThreads); @@ -65,8 +65,6 @@ void FastOS_ThreadPool::FreeThread (FastOS_ThreadInterface *thread) LinkInThread(thread, &_freeThreads); _numFree++; } - - _freeMutex.Unlock(); } void FastOS_ThreadPool::LinkOutThread (FastOS_ThreadInterface *thread, FastOS_ThreadInterface **listHead) @@ -110,7 +108,7 @@ FastOS_ThreadInterface *FastOS_ThreadPool::NewThread (FastOS_Runnable *owner, vo { FastOS_ThreadInterface *thread=nullptr; - _freeMutex.Lock(); + std::unique_lock<std::mutex> freeGuard(_freeMutex); if (!isClosed()) { if ((thread = _freeThreads) != nullptr) { @@ -126,24 +124,21 @@ FastOS_ThreadInterface *FastOS_ThreadPool::NewThread (FastOS_Runnable *owner, vo fprintf(stderr, "Error: Maximum number of threads (%d)" " already allocated.\n", _maxThreads); } else { - _freeMutex.Unlock(); - - _liveCond.Lock(); - _numLive++; - _liveCond.Unlock(); - + freeGuard.unlock(); + { + std::lock_guard<std::mutex> liveGuard(_liveMutex); + _numLive++; + } thread = FastOS_Thread::CreateThread(this); if (thread == nullptr) { - _liveCond.Lock(); + std::lock_guard<std::mutex> liveGuard(_liveMutex); _numLive--; if (_numLive == 0) { - _liveCond.Broadcast(); + _liveCond.notify_all(); } - _liveCond.Unlock(); } - - _freeMutex.Lock(); + freeGuard.lock(); if(thread != nullptr) ActivateThread(thread); @@ -151,11 +146,10 @@ FastOS_ThreadInterface *FastOS_ThreadPool::NewThread (FastOS_Runnable *owner, vo } } - _freeMutex.Unlock(); + freeGuard.unlock(); if(thread != nullptr) { - _liveCond.Lock(); + std::lock_guard<std::mutex> liveGuard(_liveMutex); thread->Dispatch(owner, arg); - _liveCond.Unlock(); } return thread; @@ -166,7 +160,7 @@ void FastOS_ThreadPool::BreakThreads () { FastOS_ThreadInterface *thread; - _freeMutex.Lock(); + std::lock_guard<std::mutex> freeGuard(_freeMutex); // Notice all active threads that they should quit for(thread=_activeThreads; thread != nullptr; thread=thread->_next) { @@ -177,26 +171,22 @@ void FastOS_ThreadPool::BreakThreads () for(thread=_freeThreads; thread != nullptr; thread=thread->_next) { thread->SetBreakFlag(); } - - _freeMutex.Unlock(); } void FastOS_ThreadPool::JoinThreads () { - _liveCond.Lock(); - - while (_numLive > 0) - _liveCond.Wait(); - - _liveCond.Unlock(); + std::unique_lock<std::mutex> liveGuard(_liveMutex); + while (_numLive > 0) { + _liveCond.wait(liveGuard); + } } void FastOS_ThreadPool::DeleteThreads () { FastOS_ThreadInterface *thread; - _freeMutex.Lock(); + std::lock_guard<std::mutex> freeGuard(_freeMutex); assert(_numActive == 0); assert(_numLive == 0); @@ -209,30 +199,25 @@ void FastOS_ThreadPool::DeleteThreads () } assert(_numFree == 0); - - _freeMutex.Unlock(); } void FastOS_ThreadPool::Close () { - _closeFlagMutex.Lock(); + std::unique_lock<std::mutex> closeFlagGuard(_closeFlagMutex); if (!_closeCalledFlag) { _closeCalledFlag = true; - _closeFlagMutex.Unlock(); + closeFlagGuard.unlock(); BreakThreads(); JoinThreads(); DeleteThreads(); - } else { - _closeFlagMutex.Unlock(); } } bool FastOS_ThreadPool::isClosed() { - _closeFlagMutex.Lock(); + std::lock_guard<std::mutex> closeFlagGuard(_closeFlagMutex); bool closed(_closeCalledFlag); - _closeFlagMutex.Unlock(); return closed; } @@ -262,20 +247,19 @@ void FastOS_ThreadInterface::Hook () while(!finished) { - _dispatched.Lock(); // BEGIN lock - + std::unique_lock<std::mutex> dispatchedGuard(_dispatchedMutex); // BEGIN lock while (_owner == nullptr && !(finished = _pool->isClosed())) { - _dispatched.Wait(); + _dispatchedCond.wait(dispatchedGuard); } - _dispatched.Unlock(); // END lock + dispatchedGuard.unlock(); // END lock if(!finished) { PreEntry(); deleteOnCompletion = _owner->DeleteOnCompletion(); _owner->Run(this, _startArg); - _dispatched.Lock(); // BEGIN lock + dispatchedGuard.lock(); // BEGIN lock if (deleteOnCompletion) { delete _owner; @@ -285,9 +269,13 @@ void FastOS_ThreadInterface::Hook () _breakFlag = false; finished = _pool->isClosed(); - _dispatched.Unlock(); // END lock + dispatchedGuard.unlock(); // END lock - _runningCond.ClearBusyBroadcast(); + { + std::lock_guard<std::mutex> runningGuard(_runningMutex); + _runningFlag = false; + _runningCond.notify_all(); + } _pool->FreeThread(this); // printf("Thread given back to FastOS_ThreadPool: %p\n", this); @@ -307,9 +295,15 @@ void FastOS_ThreadInterface::Hook () void FastOS_ThreadInterface::Dispatch(FastOS_Runnable *newOwner, void *arg) { - _dispatched.Lock(); + std::lock_guard<std::mutex> dispatchedGuard(_dispatchedMutex); - _runningCond.SetBusy(); + { + std::unique_lock<std::mutex> runningGuard(_runningMutex); + while (_runningFlag) { + _runningCond.wait(runningGuard); + } + _runningFlag = true; + } _owner = newOwner; _startArg = arg; @@ -322,18 +316,14 @@ void FastOS_ThreadInterface::Dispatch(FastOS_Runnable *newOwner, void *arg) // it the safe way we just do that, instead of keeping a unneccessary long // suppressionslist. It will be long enough anyway. - _dispatched.Signal(); - - _dispatched.Unlock(); + _dispatchedCond.notify_one(); } void FastOS_ThreadInterface::SetBreakFlag() { - _dispatched.Lock(); + std::lock_guard<std::mutex> dispatchedGuard(_dispatchedMutex); _breakFlag = true; - - _dispatched.Signal(); - _dispatched.Unlock(); + _dispatchedCond.notify_one(); } @@ -351,7 +341,10 @@ FastOS_ThreadInterface *FastOS_ThreadInterface::CreateThread(FastOS_ThreadPool * void FastOS_ThreadInterface::Join () { - _runningCond.WaitBusy(); + std::unique_lock<std::mutex> runningGuard(_runningMutex); + while (_runningFlag) { + _runningCond.wait(runningGuard); + } } diff --git a/fastos/src/vespa/fastos/thread.h b/fastos/src/vespa/fastos/thread.h index eb43fc6b664..2726efe3cf0 100644 --- a/fastos/src/vespa/fastos/thread.h +++ b/fastos/src/vespa/fastos/thread.h @@ -12,8 +12,8 @@ #include "types.h" -#include "mutex.h" -#include "cond.h" +#include <mutex> +#include <condition_variable> typedef pthread_t FastOS_ThreadId; @@ -41,7 +41,7 @@ private: FastOS_ThreadPool& operator=(const FastOS_ThreadPool&); int _startedThreadsCount; - FastOS_Mutex _closeFlagMutex; + std::mutex _closeFlagMutex; /** * The stack size for threads in this pool. */ @@ -49,8 +49,9 @@ private: bool _closeCalledFlag; // Always lock in this order - FastOS_Mutex _freeMutex; - FastOS_Cond _liveCond; + std::mutex _freeMutex; + std::mutex _liveMutex; + std::condition_variable _liveCond; /** * List of free (available) threads. */ @@ -232,7 +233,8 @@ protected: * The thread does not start (call @ref FastOS_Runnable::Run()) * until this event has been triggered. */ - FastOS_Cond _dispatched; + std::mutex _dispatchedMutex; + std::condition_variable _dispatchedCond; FastOS_ThreadInterface *_next; FastOS_ThreadInterface *_prev; @@ -303,7 +305,9 @@ protected: * Is the thread running? This is used by @ref Join(), to wait for threads * to finish. */ - FastOS_BoolCond _runningCond; + std::mutex _runningMutex; + std::condition_variable _runningCond; + bool _runningFlag; public: /** @@ -324,7 +328,8 @@ public: * Constructor. Resets internal attributes. */ FastOS_ThreadInterface (FastOS_ThreadPool *pool) - : _dispatched(), + : _dispatchedMutex(), + _dispatchedCond(), _next(nullptr), _prev(nullptr), _owner(nullptr), @@ -332,7 +337,9 @@ public: _startArg(nullptr), _breakFlag(false), _active(false), - _runningCond() + _runningMutex(), + _runningCond(), + _runningFlag(false) { } diff --git a/fastos/src/vespa/fastos/unix_ipc.cpp b/fastos/src/vespa/fastos/unix_ipc.cpp index b6a28515a5d..79fbe3ee076 100644 --- a/fastos/src/vespa/fastos/unix_ipc.cpp +++ b/fastos/src/vespa/fastos/unix_ipc.cpp @@ -5,6 +5,8 @@ #include <cstring> #include <unistd.h> #include <fcntl.h> +#include <memory> +#include <future> FastOS_UNIX_IPCHelper:: FastOS_UNIX_IPCHelper (FastOS_ApplicationInterface *app, int descriptor) @@ -427,8 +429,7 @@ RemoveClosingProcesses(void) if(!stillBusy) { - if(xproc->_closing != nullptr) - { + if (xproc->_closing) { // We already have the process lock at this point, // so modifying the list is safe. _app->RemoveChildProcess(node); @@ -447,7 +448,8 @@ RemoveClosingProcesses(void) } // The process destructor can now proceed - xproc->_closing->ClearBusy(); + auto closingPromise(std::move(xproc->_closing)); + closingPromise->set_value(); } } } @@ -637,16 +639,11 @@ void FastOS_UNIX_IPCHelper::AddProcess (FastOS_UNIX_Process *xproc) void FastOS_UNIX_IPCHelper::RemoveProcess (FastOS_UNIX_Process *xproc) { - (void)xproc; - - FastOS_BoolCond closeWait; - - closeWait.SetBusy(); - xproc->_closing = &closeWait; - + auto closePromise = std::make_unique<std::promise<void>>(); + auto closeFuture = closePromise->get_future(); + xproc->_closing = std::move(closePromise); NotifyProcessListChange(); - - closeWait.WaitBusy(); + closeFuture.wait(); } void FastOS_UNIX_IPCHelper::DeliverMessages (FastOS_RingBuffer *buffer) diff --git a/fastos/src/vespa/fastos/unix_process.h b/fastos/src/vespa/fastos/unix_process.h index 16614deb1a2..bff5a1d276e 100644 --- a/fastos/src/vespa/fastos/unix_process.h +++ b/fastos/src/vespa/fastos/unix_process.h @@ -12,8 +12,8 @@ #include "app.h" #include <string> #include <memory> +#include <future> -class FastOS_BoolCond; class FastOS_UNIX_RealProcess; class FastOS_RingBuffer; @@ -78,7 +78,7 @@ public: { TYPE_READCOUNT = 3 }; - FastOS_BoolCond *_closing; + std::unique_ptr<std::promise<void>> _closing; FastOS_ProcessRedirectListener *GetListener (DescriptorType type) { if(type == TYPE_STDOUT) |