summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@oath.com>2017-10-30 13:57:16 +0000
committerTor Egge <Tor.Egge@oath.com>2017-10-30 13:57:16 +0000
commit7ae7ce66cadb958f6d722bf8bcb4e06586cbe9c4 (patch)
tree08271a6dc33a9a3d2cbb1adb53f234f2937ebbaf
parent1ec2316f01708cb6daf3b5d943ca4d37c8e13854 (diff)
Use std::mutex and std::condition_variable instead of FastOS_Mutex and
FastOS_Condition.
-rw-r--r--fastos/src/tests/job.h3
-rw-r--r--fastos/src/tests/processtest.cpp23
-rw-r--r--fastos/src/tests/typetest.cpp3
-rw-r--r--fastos/src/vespa/fastos/thread.cpp109
-rw-r--r--fastos/src/vespa/fastos/thread.h25
-rw-r--r--fastos/src/vespa/fastos/unix_ipc.cpp21
-rw-r--r--fastos/src/vespa/fastos/unix_process.h4
7 files changed, 91 insertions, 97 deletions
diff --git a/fastos/src/tests/job.h b/fastos/src/tests/job.h
index a5b84fa0f9c..ede0f702627 100644
--- a/fastos/src/tests/job.h
+++ b/fastos/src/tests/job.h
@@ -2,6 +2,9 @@
#pragma once
+#include <vespa/fastos/mutex.h>
+#include <vespa/fastos/cond.h>
+
enum JobCode
{
PRINT_MESSAGE_AND_WAIT3SEC,
diff --git a/fastos/src/tests/processtest.cpp b/fastos/src/tests/processtest.cpp
index 11e1307027d..cd6839fd0aa 100644
--- a/fastos/src/tests/processtest.cpp
+++ b/fastos/src/tests/processtest.cpp
@@ -17,15 +17,14 @@ public:
static int _allocCount;
static int _successCount;
static int _failCount;
- static FastOS_Mutex *_counterLock;
+ static std::mutex *_counterLock;
MyListener (const char *title)
: _title(title),
_receivedBytes(0)
{
- _counterLock->Lock();
- _allocCount++;
- _counterLock->Unlock();
+ std::lock_guard<std::mutex> guard(*_counterLock);
+ _allocCount++;
}
virtual ~MyListener ()
@@ -34,14 +33,13 @@ public:
const int correctByteCount = 16;
- _counterLock->Lock();
+ std::lock_guard<std::mutex> guard(*_counterLock);
if(_receivedBytes == (isStdout ? correctByteCount : 0))
_successCount++;
else
_failCount++;
_allocCount--;
- _counterLock->Unlock();
}
void OnReceiveData (const void *data, size_t length) override
@@ -62,7 +60,7 @@ public:
int MyListener::_allocCount = 0;
int MyListener::_successCount = 0;
int MyListener::_failCount = 0;
-FastOS_Mutex *MyListener::_counterLock = nullptr;
+std::mutex *MyListener::_counterLock = nullptr;
class ThreadRunJob : public FastOS_Runnable
@@ -122,7 +120,7 @@ private:
// or not.
bool _gotMessage;
int _receivedMessages;
- FastOS_Mutex *_counterLock;
+ std::mutex *_counterLock;
bool _isChild;
public:
ProcessTest ()
@@ -156,9 +154,8 @@ public:
// We only have the counter lock if we are the parent process.
if(_counterLock != nullptr)
{
- _counterLock->Lock();
- _receivedMessages++;
- _counterLock->Unlock();
+ std::lock_guard<std::mutex> guard(*_counterLock);
+ _receivedMessages++;
}
}
@@ -219,7 +216,7 @@ public:
const int numLoops = 100;
const int numEachTime = 40;
- MyListener::_counterLock = new FastOS_Mutex();
+ MyListener::_counterLock = new std::mutex;
char testHeader[200];
strcpy(testHeader, "Process Test");
@@ -381,7 +378,7 @@ public:
TestHeader ("IPC Test");
const char *childProgram = _argv[1];
- _counterLock = new FastOS_Mutex();
+ _counterLock = new std::mutex;
int i;
for(i=0; i<30; i++)
diff --git a/fastos/src/tests/typetest.cpp b/fastos/src/tests/typetest.cpp
index 209af305501..503c9a30d24 100644
--- a/fastos/src/tests/typetest.cpp
+++ b/fastos/src/tests/typetest.cpp
@@ -16,11 +16,8 @@ private:
TestHeader("Object Sizes (bytes)");
Progress(true, "FastOS_Application: %d", sizeof(FastOS_Application));
- Progress(true, "FastOS_BoolCond %d", sizeof(FastOS_BoolCond));
- Progress(true, "FastOS_Cond %d", sizeof(FastOS_Cond));
Progress(true, "FastOS_DirectoryScan %d", sizeof(FastOS_DirectoryScan));
Progress(true, "FastOS_File: %d", sizeof(FastOS_File));
- Progress(true, "FastOS_Mutex: %d", sizeof(FastOS_Mutex));
Progress(true, "FastOS_Runnable %d", sizeof(FastOS_Runnable));
Progress(true, "FastOS_ServerSocket %d", sizeof(FastOS_ServerSocket));
Progress(true, "FastOS_Socket: %d", sizeof(FastOS_Socket));
diff --git a/fastos/src/vespa/fastos/thread.cpp b/fastos/src/vespa/fastos/thread.cpp
index 3cd3bb4b85b..5e3400b70e3 100644
--- a/fastos/src/vespa/fastos/thread.cpp
+++ b/fastos/src/vespa/fastos/thread.cpp
@@ -20,6 +20,7 @@ FastOS_ThreadPool::FastOS_ThreadPool(int stackSize, int maxThreads)
_stackSize(stackSize),
_closeCalledFlag(false),
_freeMutex(),
+ _liveMutex(),
_liveCond(),
_freeThreads(nullptr),
_activeThreads(nullptr),
@@ -40,21 +41,20 @@ void FastOS_ThreadPool::ThreadIsAboutToTerminate(FastOS_ThreadInterface *)
{
assert(isClosed());
- _liveCond.Lock();
+ std::lock_guard<std::mutex> guard(_liveMutex);
_numTerminated++;
_numLive--;
- if (_numLive == 0)
- _liveCond.Broadcast();
-
- _liveCond.Unlock();
+ if (_numLive == 0) {
+ _liveCond.notify_all();
+ }
}
// This is a NOP if the thread isn't active.
void FastOS_ThreadPool::FreeThread (FastOS_ThreadInterface *thread)
{
- _freeMutex.Lock();
+ std::lock_guard<std::mutex> guard(_freeMutex);
if(thread->_active) {
LinkOutThread(thread, &_activeThreads);
@@ -65,8 +65,6 @@ void FastOS_ThreadPool::FreeThread (FastOS_ThreadInterface *thread)
LinkInThread(thread, &_freeThreads);
_numFree++;
}
-
- _freeMutex.Unlock();
}
void FastOS_ThreadPool::LinkOutThread (FastOS_ThreadInterface *thread, FastOS_ThreadInterface **listHead)
@@ -110,7 +108,7 @@ FastOS_ThreadInterface *FastOS_ThreadPool::NewThread (FastOS_Runnable *owner, vo
{
FastOS_ThreadInterface *thread=nullptr;
- _freeMutex.Lock();
+ std::unique_lock<std::mutex> freeGuard(_freeMutex);
if (!isClosed()) {
if ((thread = _freeThreads) != nullptr) {
@@ -126,24 +124,21 @@ FastOS_ThreadInterface *FastOS_ThreadPool::NewThread (FastOS_Runnable *owner, vo
fprintf(stderr, "Error: Maximum number of threads (%d)"
" already allocated.\n", _maxThreads);
} else {
- _freeMutex.Unlock();
-
- _liveCond.Lock();
- _numLive++;
- _liveCond.Unlock();
-
+ freeGuard.unlock();
+ {
+ std::lock_guard<std::mutex> liveGuard(_liveMutex);
+ _numLive++;
+ }
thread = FastOS_Thread::CreateThread(this);
if (thread == nullptr) {
- _liveCond.Lock();
+ std::lock_guard<std::mutex> liveGuard(_liveMutex);
_numLive--;
if (_numLive == 0) {
- _liveCond.Broadcast();
+ _liveCond.notify_all();
}
- _liveCond.Unlock();
}
-
- _freeMutex.Lock();
+ freeGuard.lock();
if(thread != nullptr)
ActivateThread(thread);
@@ -151,11 +146,10 @@ FastOS_ThreadInterface *FastOS_ThreadPool::NewThread (FastOS_Runnable *owner, vo
}
}
- _freeMutex.Unlock();
+ freeGuard.unlock();
if(thread != nullptr) {
- _liveCond.Lock();
+ std::lock_guard<std::mutex> liveGuard(_liveMutex);
thread->Dispatch(owner, arg);
- _liveCond.Unlock();
}
return thread;
@@ -166,7 +160,7 @@ void FastOS_ThreadPool::BreakThreads ()
{
FastOS_ThreadInterface *thread;
- _freeMutex.Lock();
+ std::lock_guard<std::mutex> freeGuard(_freeMutex);
// Notice all active threads that they should quit
for(thread=_activeThreads; thread != nullptr; thread=thread->_next) {
@@ -177,26 +171,22 @@ void FastOS_ThreadPool::BreakThreads ()
for(thread=_freeThreads; thread != nullptr; thread=thread->_next) {
thread->SetBreakFlag();
}
-
- _freeMutex.Unlock();
}
void FastOS_ThreadPool::JoinThreads ()
{
- _liveCond.Lock();
-
- while (_numLive > 0)
- _liveCond.Wait();
-
- _liveCond.Unlock();
+ std::unique_lock<std::mutex> liveGuard(_liveMutex);
+ while (_numLive > 0) {
+ _liveCond.wait(liveGuard);
+ }
}
void FastOS_ThreadPool::DeleteThreads ()
{
FastOS_ThreadInterface *thread;
- _freeMutex.Lock();
+ std::lock_guard<std::mutex> freeGuard(_freeMutex);
assert(_numActive == 0);
assert(_numLive == 0);
@@ -209,30 +199,25 @@ void FastOS_ThreadPool::DeleteThreads ()
}
assert(_numFree == 0);
-
- _freeMutex.Unlock();
}
void FastOS_ThreadPool::Close ()
{
- _closeFlagMutex.Lock();
+ std::unique_lock<std::mutex> closeFlagGuard(_closeFlagMutex);
if (!_closeCalledFlag) {
_closeCalledFlag = true;
- _closeFlagMutex.Unlock();
+ closeFlagGuard.unlock();
BreakThreads();
JoinThreads();
DeleteThreads();
- } else {
- _closeFlagMutex.Unlock();
}
}
bool FastOS_ThreadPool::isClosed()
{
- _closeFlagMutex.Lock();
+ std::lock_guard<std::mutex> closeFlagGuard(_closeFlagMutex);
bool closed(_closeCalledFlag);
- _closeFlagMutex.Unlock();
return closed;
}
@@ -262,20 +247,19 @@ void FastOS_ThreadInterface::Hook ()
while(!finished) {
- _dispatched.Lock(); // BEGIN lock
-
+ std::unique_lock<std::mutex> dispatchedGuard(_dispatchedMutex); // BEGIN lock
while (_owner == nullptr && !(finished = _pool->isClosed())) {
- _dispatched.Wait();
+ _dispatchedCond.wait(dispatchedGuard);
}
- _dispatched.Unlock(); // END lock
+ dispatchedGuard.unlock(); // END lock
if(!finished) {
PreEntry();
deleteOnCompletion = _owner->DeleteOnCompletion();
_owner->Run(this, _startArg);
- _dispatched.Lock(); // BEGIN lock
+ dispatchedGuard.lock(); // BEGIN lock
if (deleteOnCompletion) {
delete _owner;
@@ -285,9 +269,13 @@ void FastOS_ThreadInterface::Hook ()
_breakFlag = false;
finished = _pool->isClosed();
- _dispatched.Unlock(); // END lock
+ dispatchedGuard.unlock(); // END lock
- _runningCond.ClearBusyBroadcast();
+ {
+ std::lock_guard<std::mutex> runningGuard(_runningMutex);
+ _runningFlag = false;
+ _runningCond.notify_all();
+ }
_pool->FreeThread(this);
// printf("Thread given back to FastOS_ThreadPool: %p\n", this);
@@ -307,9 +295,15 @@ void FastOS_ThreadInterface::Hook ()
void FastOS_ThreadInterface::Dispatch(FastOS_Runnable *newOwner, void *arg)
{
- _dispatched.Lock();
+ std::lock_guard<std::mutex> dispatchedGuard(_dispatchedMutex);
- _runningCond.SetBusy();
+ {
+ std::unique_lock<std::mutex> runningGuard(_runningMutex);
+ while (_runningFlag) {
+ _runningCond.wait(runningGuard);
+ }
+ _runningFlag = true;
+ }
_owner = newOwner;
_startArg = arg;
@@ -322,18 +316,14 @@ void FastOS_ThreadInterface::Dispatch(FastOS_Runnable *newOwner, void *arg)
// it the safe way we just do that, instead of keeping a unneccessary long
// suppressionslist. It will be long enough anyway.
- _dispatched.Signal();
-
- _dispatched.Unlock();
+ _dispatchedCond.notify_one();
}
void FastOS_ThreadInterface::SetBreakFlag()
{
- _dispatched.Lock();
+ std::lock_guard<std::mutex> dispatchedGuard(_dispatchedMutex);
_breakFlag = true;
-
- _dispatched.Signal();
- _dispatched.Unlock();
+ _dispatchedCond.notify_one();
}
@@ -351,7 +341,10 @@ FastOS_ThreadInterface *FastOS_ThreadInterface::CreateThread(FastOS_ThreadPool *
void FastOS_ThreadInterface::Join ()
{
- _runningCond.WaitBusy();
+ std::unique_lock<std::mutex> runningGuard(_runningMutex);
+ while (_runningFlag) {
+ _runningCond.wait(runningGuard);
+ }
}
diff --git a/fastos/src/vespa/fastos/thread.h b/fastos/src/vespa/fastos/thread.h
index eb43fc6b664..2726efe3cf0 100644
--- a/fastos/src/vespa/fastos/thread.h
+++ b/fastos/src/vespa/fastos/thread.h
@@ -12,8 +12,8 @@
#include "types.h"
-#include "mutex.h"
-#include "cond.h"
+#include <mutex>
+#include <condition_variable>
typedef pthread_t FastOS_ThreadId;
@@ -41,7 +41,7 @@ private:
FastOS_ThreadPool& operator=(const FastOS_ThreadPool&);
int _startedThreadsCount;
- FastOS_Mutex _closeFlagMutex;
+ std::mutex _closeFlagMutex;
/**
* The stack size for threads in this pool.
*/
@@ -49,8 +49,9 @@ private:
bool _closeCalledFlag;
// Always lock in this order
- FastOS_Mutex _freeMutex;
- FastOS_Cond _liveCond;
+ std::mutex _freeMutex;
+ std::mutex _liveMutex;
+ std::condition_variable _liveCond;
/**
* List of free (available) threads.
*/
@@ -232,7 +233,8 @@ protected:
* The thread does not start (call @ref FastOS_Runnable::Run())
* until this event has been triggered.
*/
- FastOS_Cond _dispatched;
+ std::mutex _dispatchedMutex;
+ std::condition_variable _dispatchedCond;
FastOS_ThreadInterface *_next;
FastOS_ThreadInterface *_prev;
@@ -303,7 +305,9 @@ protected:
* Is the thread running? This is used by @ref Join(), to wait for threads
* to finish.
*/
- FastOS_BoolCond _runningCond;
+ std::mutex _runningMutex;
+ std::condition_variable _runningCond;
+ bool _runningFlag;
public:
/**
@@ -324,7 +328,8 @@ public:
* Constructor. Resets internal attributes.
*/
FastOS_ThreadInterface (FastOS_ThreadPool *pool)
- : _dispatched(),
+ : _dispatchedMutex(),
+ _dispatchedCond(),
_next(nullptr),
_prev(nullptr),
_owner(nullptr),
@@ -332,7 +337,9 @@ public:
_startArg(nullptr),
_breakFlag(false),
_active(false),
- _runningCond()
+ _runningMutex(),
+ _runningCond(),
+ _runningFlag(false)
{
}
diff --git a/fastos/src/vespa/fastos/unix_ipc.cpp b/fastos/src/vespa/fastos/unix_ipc.cpp
index b6a28515a5d..79fbe3ee076 100644
--- a/fastos/src/vespa/fastos/unix_ipc.cpp
+++ b/fastos/src/vespa/fastos/unix_ipc.cpp
@@ -5,6 +5,8 @@
#include <cstring>
#include <unistd.h>
#include <fcntl.h>
+#include <memory>
+#include <future>
FastOS_UNIX_IPCHelper::
FastOS_UNIX_IPCHelper (FastOS_ApplicationInterface *app, int descriptor)
@@ -427,8 +429,7 @@ RemoveClosingProcesses(void)
if(!stillBusy)
{
- if(xproc->_closing != nullptr)
- {
+ if (xproc->_closing) {
// We already have the process lock at this point,
// so modifying the list is safe.
_app->RemoveChildProcess(node);
@@ -447,7 +448,8 @@ RemoveClosingProcesses(void)
}
// The process destructor can now proceed
- xproc->_closing->ClearBusy();
+ auto closingPromise(std::move(xproc->_closing));
+ closingPromise->set_value();
}
}
}
@@ -637,16 +639,11 @@ void FastOS_UNIX_IPCHelper::AddProcess (FastOS_UNIX_Process *xproc)
void FastOS_UNIX_IPCHelper::RemoveProcess (FastOS_UNIX_Process *xproc)
{
- (void)xproc;
-
- FastOS_BoolCond closeWait;
-
- closeWait.SetBusy();
- xproc->_closing = &closeWait;
-
+ auto closePromise = std::make_unique<std::promise<void>>();
+ auto closeFuture = closePromise->get_future();
+ xproc->_closing = std::move(closePromise);
NotifyProcessListChange();
-
- closeWait.WaitBusy();
+ closeFuture.wait();
}
void FastOS_UNIX_IPCHelper::DeliverMessages (FastOS_RingBuffer *buffer)
diff --git a/fastos/src/vespa/fastos/unix_process.h b/fastos/src/vespa/fastos/unix_process.h
index 16614deb1a2..bff5a1d276e 100644
--- a/fastos/src/vespa/fastos/unix_process.h
+++ b/fastos/src/vespa/fastos/unix_process.h
@@ -12,8 +12,8 @@
#include "app.h"
#include <string>
#include <memory>
+#include <future>
-class FastOS_BoolCond;
class FastOS_UNIX_RealProcess;
class FastOS_RingBuffer;
@@ -78,7 +78,7 @@ public:
{
TYPE_READCOUNT = 3
};
- FastOS_BoolCond *_closing;
+ std::unique_ptr<std::promise<void>> _closing;
FastOS_ProcessRedirectListener *GetListener (DescriptorType type)
{
if(type == TYPE_STDOUT)