path: root/fastos/src/tests/threadtest.cpp
diff options
Diffstat (limited to 'fastos/src/tests/threadtest.cpp')
1 files changed, 1466 insertions, 0 deletions
diff --git a/fastos/src/tests/threadtest.cpp b/fastos/src/tests/threadtest.cpp
new file mode 100644
index 00000000000..43ef6aef82a
--- /dev/null
+++ b/fastos/src/tests/threadtest.cpp
@@ -0,0 +1,1466 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <stdlib.h>
+#include <vespa/fastos/fastos.h>
+#include "tests.h"
+#define PRI_TIME_COUNT 4096
+enum JobCode
+class Job
+ Job(const Job &);
+ Job &operator=(const Job&);
+ JobCode code;
+ char *message;
+ FastOS_Mutex *mutex;
+ FastOS_Cond *condition;
+ FastOS_BoolCond *boolcondition;
+ FastOS_ThreadInterface *otherThread, *ownThread;
+ double *timebuf;
+ double average;
+ int result;
+ FastOS_Thread::Priority threadPri;
+ FastOS_ThreadId _threadId;
+ Job *otherjob;
+ int bouncewakeupcnt;
+ bool bouncewakeup;
+ Job()
+ : code(NOP),
+ message(NULL),
+ mutex(NULL),
+ condition(NULL),
+ boolcondition(NULL),
+ otherThread(NULL),
+ ownThread(NULL),
+ timebuf(NULL),
+ average(0.0),
+ result(-1),
+ threadPri(FastOS_Thread::PRIORITY_NORMAL),
+ _threadId(),
+ otherjob(NULL),
+ bouncewakeupcnt(0),
+ bouncewakeup(false)
+ {
+ }
+ ~Job()
+ {
+ if(message != NULL)
+ free(message);
+ }
+static volatile int64_t number;
+#define MAX_THREADS 7
+class ThreadTest : public BaseTest, public FastOS_Runnable
+ FastOS_Mutex printMutex;
+ ThreadTest(void)
+ : printMutex()
+ {
+ }
+ virtual ~ThreadTest() {};
+ void PrintProgress (char *string)
+ {
+ printMutex.Lock();
+ BaseTest::PrintProgress(string);
+ printMutex.Unlock();
+ }
+ void Run (FastOS_ThreadInterface *thread, void *arg);
+ void WaitForThreadsToFinish (Job *jobs, int count)
+ {
+ int i;
+ Progress(true, "Waiting for threads to finish...");
+ for(;;)
+ {
+ bool threadsFinished=true;
+ for(i=0; i<count; i++)
+ {
+ if(jobs[i].result == -1)
+ {
+ threadsFinished = false;
+ break;
+ }
+ }
+ FastOS_Thread::Sleep(500);
+ if(threadsFinished)
+ break;
+ }
+ Progress(true, "Threads finished");
+ }
+ void MutexTest (bool usingMutex)
+ {
+ if(usingMutex)
+ TestHeader("Mutex Test");
+ else
+ TestHeader("Not Using Mutex Test");
+ FastOS_ThreadPool *pool = new FastOS_ThreadPool(128*1024, MAX_THREADS);
+ if(Progress(pool != NULL, "Allocating ThreadPool"))
+ {
+ int i;
+ FastOS_Mutex *myMutex=NULL;
+ if(usingMutex)
+ myMutex = new FastOS_Mutex();
+ for(i=0; i<MUTEX_TEST_THREADS; i++)
+ {
+ jobs[i].code = INCREASE_NUMBER;
+ jobs[i].mutex = myMutex;
+ }
+ number = 0;
+ for(i=0; i<MUTEX_TEST_THREADS; i++)
+ {
+ bool rc = (NULL != pool->NewThread(this,
+ static_cast<void *>(&jobs[i])));
+ Progress(rc, "Creating Thread with%s mutex", (usingMutex ? "" : "out"));
+ };
+ WaitForThreadsToFinish(jobs, MUTEX_TEST_THREADS);
+ for(i=0; i<MUTEX_TEST_THREADS; i++)
+ {
+ Progress(true, "Thread returned with resultcode %d", jobs[i].result);
+ }
+ bool wasOk=true;
+ int concurrentHits=0;
+ for(i=0; i<MUTEX_TEST_THREADS; i++)
+ {
+ int result = jobs[i].result;
+ if(usingMutex)
+ {
+ if((result % INCREASE_NUMBER_AMOUNT) != 0)
+ {
+ wasOk = false;
+ Progress(false, "Mutex locking did not work (%d).", result);
+ break;
+ }
+ }
+ else
+ {
+ if((result != 0) &&
+ (result % INCREASE_NUMBER_AMOUNT) == 0)
+ {
+ if((++concurrentHits) == 2)
+ {
+ wasOk = false;
+ Progress(false, "Very unlikely that threads are running "
+ "concurrently (%d)", jobs[i].result);
+ break;
+ }
+ }
+ }
+ }
+ if(wasOk)
+ {
+ if(usingMutex)
+ {
+ Progress(true, "Using the mutex, the returned numbers were alligned.");
+ }
+ else
+ {
+ Progress(true, "Returned numbers were not alligned. "
+ "This was the expected result.");
+ }
+ }
+ if(myMutex != NULL)
+ delete(myMutex);
+ Progress(true, "Closing threadpool...");
+ pool->Close();
+ Progress(true, "Deleting threadpool...");
+ delete(pool);
+ }
+ PrintSeparator();
+ }
+ void TooManyThreadsTest ()
+ {
+ TestHeader("Too Many Threads Test");
+ FastOS_ThreadPool *pool = new FastOS_ThreadPool(128*1024, MAX_THREADS);
+ if(Progress(pool != NULL, "Allocating ThreadPool"))
+ {
+ int i;
+ Job jobs[MAX_THREADS];
+ for(i=0; i<MAX_THREADS; i++)
+ {
+ jobs[i].code = PRINT_MESSAGE_AND_WAIT3SEC;
+ jobs[i].message = static_cast<char *>(malloc(100));
+ sprintf(jobs[i].message, "Thread %d invocation", i+1);
+ }
+ for(i=0; i<MAX_THREADS+1; i++)
+ {
+ if(i==MAX_THREADS)
+ {
+ bool rc = (NULL == pool->NewThread(this,
+ static_cast<void *>(&jobs[0])));
+ Progress(rc, "Creating too many threads should fail.");
+ }
+ else
+ {
+ bool rc = (NULL != pool->NewThread(this,
+ static_cast<void *>(&jobs[i])));
+ Progress(rc, "Creating Thread");
+ }
+ };
+ WaitForThreadsToFinish(jobs, MAX_THREADS);
+ Progress(true, "Verifying result codes...");
+ for(i=0; i<MAX_THREADS; i++)
+ {
+ Progress(jobs[i].result ==
+ static_cast<int>(strlen(jobs[i].message)),
+ "Checking result code from thread (%d==%d)",
+ jobs[i].result, strlen(jobs[i].message));
+ }
+ Progress(true, "Closing threadpool...");
+ pool->Close();
+ Progress(true, "Deleting threadpool...");
+ delete(pool);
+ }
+ PrintSeparator();
+ }
+ void HowManyThreadsTest ()
+ {
+ #define HOW_MAX_THREADS (1024)
+ TestHeader("How Many Threads Test");
+ FastOS_ThreadPool *pool = new FastOS_ThreadPool(128*1024, HOW_MAX_THREADS);
+ if(Progress(pool != NULL, "Allocating ThreadPool"))
+ {
+ int i;
+ Job jobs[HOW_MAX_THREADS];
+ for(i=0; i<HOW_MAX_THREADS; i++)
+ {
+ jobs[i].code = PRINT_MESSAGE_AND_WAIT3SEC;
+ jobs[i].message = static_cast<char *>(malloc(100));
+ sprintf(jobs[i].message, "Thread %d invocation", i+1);
+ }
+ for(i=0; i<HOW_MAX_THREADS; i++)
+ {
+ {
+ bool rc = (NULL == pool->NewThread(this,
+ static_cast<void *>(&jobs[0])));
+ Progress(rc, "Creating too many threads should fail.");
+ }
+ else
+ {
+ bool rc = (NULL != pool->NewThread(this,
+ static_cast<void *>(&jobs[i])));
+ Progress(rc, "Creating Thread");
+ }
+ };
+ WaitForThreadsToFinish(jobs, HOW_MAX_THREADS);
+ Progress(true, "Verifying result codes...");
+ for(i=0; i<HOW_MAX_THREADS; i++)
+ {
+ Progress(jobs[i].result ==
+ static_cast<int>(strlen(jobs[i].message)),
+ "Checking result code from thread (%d==%d)",
+ jobs[i].result, strlen(jobs[i].message));
+ }
+ Progress(true, "Closing threadpool...");
+ pool->Close();
+ Progress(true, "Deleting threadpool...");
+ delete(pool);
+ }
+ PrintSeparator();
+ }
+ void CreateSingleThread ()
+ {
+ TestHeader("Create Single Thread Test");
+ FastOS_ThreadPool *pool = new FastOS_ThreadPool(128*1024);
+ if(Progress(pool != NULL, "Allocating ThreadPool"))
+ {
+ bool rc = (NULL != pool->NewThread(this, NULL));
+ Progress(rc, "Creating Thread");
+ Progress(true, "Sleeping 3 seconds");
+ FastOS_Thread::Sleep(3000);
+ }
+ Progress(true, "Closing threadpool...");
+ pool->Close();
+ Progress(true, "Deleting threadpool...");
+ delete(pool);
+ PrintSeparator();
+ }
+ void CreateSingleThreadAndJoin ()
+ {
+ TestHeader("Create Single Thread And Join Test");
+ FastOS_ThreadPool *pool = new FastOS_ThreadPool(128*1024);
+ if(Progress(pool != NULL, "Allocating ThreadPool"))
+ {
+ Job job;
+ job.code = NOP;
+ job.result = -1;
+ bool rc = (NULL != pool->NewThread(this, &job));
+ Progress(rc, "Creating Thread");
+ WaitForThreadsToFinish(&job, 1);
+ }
+ Progress(true, "Closing threadpool...");
+ pool->Close();
+ Progress(true, "Deleting threadpool...");
+ delete(pool);
+ PrintSeparator();
+ }
+ void ThreadCreatePerformance (bool silent, int count, int outercount)
+ {
+ int i;
+ int j;
+ bool rc;
+ int threadsok;
+ FastOS_Time starttime;
+ FastOS_Time endtime;
+ FastOS_Time usedtime;
+ if (!silent)
+ TestHeader("Thread Create Performance");
+ FastOS_ThreadPool *pool = new FastOS_ThreadPool(128 * 1024);
+ if (!silent)
+ Progress(pool != NULL, "Allocating ThreadPool");
+ if (pool != NULL) {
+ Job *jobs = new Job[count];
+ threadsok = 0;
+ starttime.SetNow();
+ for (i = 0; i < count; i++) {
+ jobs[i].code = SILENTNOP;
+ jobs[i].ownThread = pool->NewThread(this, &jobs[i]);
+ rc = (jobs[i].ownThread != NULL);
+ if (rc)
+ threadsok++;
+ }
+ for (j = 0; j < outercount; j++) {
+ for (i = 0; i < count; i++) {
+ if (jobs[i].ownThread != NULL)
+ jobs[i].ownThread->Join();
+ jobs[i].ownThread = pool->NewThread(this, &jobs[i]);
+ rc = (jobs[i].ownThread != NULL);
+ if (rc)
+ threadsok++;
+ }
+ }
+ for (i = 0; i < count; i++) {
+ if (jobs[i].ownThread != NULL)
+ jobs[i].ownThread->Join();
+ }
+ endtime.SetNow();
+ usedtime = endtime;
+ usedtime -= starttime;
+ if (!silent) {
+ Progress(true, "Used time: %d.%03d",
+ usedtime.GetSeconds(), usedtime.GetMicroSeconds() / 1000);
+ double timeused = usedtime.GetSeconds() +
+ static_cast<double>(usedtime.GetMicroSeconds()) / 1000000.0;
+ ProgressFloat(true, "Threads/s: %6.1f",
+ static_cast<float>(static_cast<double>(threadsok) /
+ timeused));
+ }
+ if (threadsok != ((outercount + 1) * count))
+ Progress(false, "Only started %d of %d threads", threadsok,
+ (outercount + 1) * count);
+ if (!silent)
+ Progress(true, "Closing threadpool...");
+ pool->Close();
+ delete [] jobs;
+ if (!silent)
+ Progress(true, "Deleting threadpool...");
+ delete(pool);
+ if (!silent)
+ PrintSeparator();
+ }
+ }
+ void ClosePoolStability(void) {
+ int i;
+ TestHeader("ThreadPool close stability test");
+ for (i = 0; i < 8000; i++) {
+ // Progress(true, "Creating pool iteration %d", i + 1);
+ ThreadCreatePerformance(true, 2, 1);
+ }
+ PrintSeparator();
+ }
+ void ClosePoolTest ()
+ {
+ TestHeader("Close Pool Test");
+ FastOS_ThreadPool pool(128*1024);
+ const int closePoolThreads=9;
+ Job jobs[closePoolThreads];
+ number = 0;
+ for(int i=0; i<closePoolThreads; i++)
+ {
+ jobs[i].code = INCREASE_NUMBER;
+ bool rc = (NULL != pool.NewThread(this,
+ static_cast<void *>(&jobs[i])));
+ Progress(rc, "Creating Thread %d", i+1);
+ }
+ Progress(true, "Waiting for threads to finish using pool.Close()...");
+ pool.Close();
+ Progress(true, "Pool closed.");
+ PrintSeparator();
+ }
+ void BreakFlagTest ()
+ {
+ TestHeader("BreakFlag Test");
+ FastOS_ThreadPool pool(128*1024);
+ const int breakFlagThreads=4;
+ Job jobs[breakFlagThreads];
+ for(int i=0; i<breakFlagThreads; i++)
+ {
+ jobs[i].code = WAIT_FOR_BREAK_FLAG;
+ bool rc = (NULL != pool.NewThread(this,
+ static_cast<void *>(&jobs[i])));
+ Progress(rc, "Creating Thread %d", i+1);
+ }
+ Progress(true, "Waiting for threads to finish using pool.Close()...");
+ pool.Close();
+ Progress(true, "Pool closed.");
+ PrintSeparator();
+ }
+ void SingleThreadJoinWaitMultipleTest(int variant)
+ {
+ bool rc=false;
+ char testName[300];
+ sprintf(testName, "Single Thread Join Wait Multiple Test %d", variant);
+ TestHeader(testName);
+ FastOS_ThreadPool pool(128*1024);
+ const int testThreads=5;
+ int lastThreadNum = testThreads-1;
+ int i;
+ Job jobs[testThreads];
+ FastOS_Mutex jobMutex;
+ // The mutex is used to pause the first threads until we have created
+ // the last one.
+ jobMutex.Lock();
+ for(i=0; i<lastThreadNum; i++)
+ {
+ jobs[i].code = WAIT_FOR_THREAD_TO_FINISH;
+ jobs[i].mutex = &jobMutex;
+ jobs[i].ownThread = pool.NewThread(this,
+ static_cast<void *>(&jobs[i]));
+ rc = (jobs[i].ownThread != NULL);
+ Progress(rc, "Creating Thread %d", i+1);
+ if(!rc)
+ break;
+ }
+ if(rc)
+ {
+ jobs[lastThreadNum].code = (((variant & 2) != 0) ? NOP : PRINT_MESSAGE_AND_WAIT3SEC);
+ jobs[lastThreadNum].message = strdup("This is the thread that others wait for.");
+ FastOS_ThreadInterface *lastThread;
+ lastThread = pool.NewThread(this,
+ static_cast<void *>
+ (&jobs[lastThreadNum]));
+ rc = (lastThread != NULL);
+ Progress(rc, "Creating last thread");
+ if(rc)
+ {
+ for(i=0; i<lastThreadNum; i++)
+ {
+ jobs[i].otherThread = lastThread;
+ }
+ }
+ }
+ jobMutex.Unlock();
+ if((variant & 1) != 0)
+ {
+ for(i=0; i<lastThreadNum; i++)
+ {
+ Progress(true, "Waiting for thread %d to finish using Join()", i+1);
+ jobs[i].ownThread->Join();
+ Progress(true, "Thread %d finished.", i+1);
+ }
+ }
+ Progress(true, "Closing pool.");
+ pool.Close();
+ Progress(true, "Pool closed.");
+ PrintSeparator();
+ }
+ void WaitForXThreadsToHaveWait (Job *jobs,
+ int jobCount,
+ FastOS_Cond *condition,
+ int numWait)
+ {
+ Progress(true, "Waiting for %d threads to be in wait state", numWait);
+ int oldNumber=-10000;
+ for(;;)
+ {
+ int waitingThreads=0;
+ condition->Lock();
+ for(int i=0; i<jobCount; i++)
+ {
+ if(jobs[i].result == 1)
+ waitingThreads++;
+ }
+ condition->Unlock();
+ if(waitingThreads != oldNumber)
+ Progress(true, "%d threads are waiting", waitingThreads);
+ oldNumber = waitingThreads;
+ if(waitingThreads == numWait)
+ break;
+ FastOS_Thread::Sleep(100);
+ }
+ }
+ void SharedSignalAndBroadcastTest (Job *jobs, int numThreads,
+ FastOS_Cond *condition,
+ FastOS_ThreadPool *pool)
+ {
+ for(int i=0; i<numThreads; i++)
+ {
+ jobs[i].code = WAIT_FOR_CONDITION;
+ jobs[i].condition = condition;
+ jobs[i].ownThread = pool->NewThread(this,
+ static_cast<void *>(&jobs[i]));
+ bool rc=(jobs[i].ownThread != NULL);
+ Progress(rc, "CreatingThread %d", i+1);
+ }
+ WaitForXThreadsToHaveWait (jobs, numThreads,
+ condition, numThreads);
+ // Threads are not guaranteed to have entered sleep yet,
+ // as this test only tests for result code
+ // Wait another second to be sure.
+ FastOS_Thread::Sleep(1000);
+ }
+ void BounceTest(void)
+ {
+ TestHeader("Bounce Test");
+ FastOS_ThreadPool pool(128 * 1024);
+ FastOS_Cond cond1;
+ FastOS_Cond cond2;
+ Job job1;
+ Job job2;
+ FastOS_Time checkTime;
+ int cnt1;
+ int cnt2;
+ int cntsum;
+ int lastcntsum;
+ job1.code = BOUNCE_CONDITIONS;
+ job2.code = BOUNCE_CONDITIONS;
+ job1.otherjob = &job2;
+ job2.otherjob = &job1;
+ job1.condition = &cond1;
+ job2.condition = &cond2;
+ job1.ownThread = pool.NewThread(this, static_cast<void *>(&job1));
+ job2.ownThread = pool.NewThread(this, static_cast<void *>(&job2));
+ lastcntsum = -1;
+ for (int iter = 0; iter < 8; iter++) {
+ checkTime.SetNow();
+ int left = static_cast<int>(checkTime.MilliSecsToNow());
+ while (left < 1000) {
+ FastOS_Thread::Sleep(1000 - left);
+ left = static_cast<int>(checkTime.MilliSecsToNow());
+ }
+ cond1.Lock();
+ cnt1 = job1.bouncewakeupcnt;
+ cond1.Unlock();
+ cond2.Lock();
+ cnt2 = job2.bouncewakeupcnt;
+ cond2.Unlock();
+ cntsum = cnt1 + cnt2;
+ Progress(lastcntsum != cntsum, "%d bounces", cntsum);
+ lastcntsum = cntsum;
+ }
+ job1.ownThread->SetBreakFlag();
+ cond1.Lock();
+ job1.bouncewakeup = true;
+ cond1.Signal();
+ cond1.Unlock();
+ job2.ownThread->SetBreakFlag();
+ cond2.Lock();
+ job2.bouncewakeup = true;
+ cond2.Signal();
+ cond2.Unlock();
+ pool.Close();
+ Progress(true, "Pool closed.");
+ PrintSeparator();
+ }
+ void SignalTest ()
+ {
+ const int numThreads = 5;
+ TestHeader("Signal Test");
+ FastOS_ThreadPool pool(128*1024);
+ Job jobs[numThreads];
+ FastOS_Cond condition;
+ SharedSignalAndBroadcastTest(jobs, numThreads, &condition, &pool);
+ for(int i=0; i<numThreads; i++)
+ {
+ condition.Signal();
+ WaitForXThreadsToHaveWait(jobs, numThreads,
+ &condition, numThreads-1-i);
+ }
+ Progress(true, "Waiting for threads to finish using pool.Close()...");
+ pool.Close();
+ Progress(true, "Pool closed.");
+ PrintSeparator();
+ }
+ void BroadcastTest ()
+ {
+ TestHeader("Broadcast Test");
+ const int numThreads = 5;
+ FastOS_ThreadPool pool(128*1024);
+ Job jobs[numThreads];
+ FastOS_Cond condition;
+ SharedSignalAndBroadcastTest(jobs, numThreads, &condition, &pool);
+ condition.Broadcast();
+ WaitForXThreadsToHaveWait(jobs, numThreads, &condition, 0);
+ Progress(true, "Waiting for threads to finish using pool.Close()...");
+ pool.Close();
+ Progress(true, "Pool closed.");
+ PrintSeparator();
+ }
+ void PriorityTest (int sign)
+ {
+ if(sign == 1)
+ TestHeader("Priority Test (positive)");
+ else
+ TestHeader("Priority Test (negative)");
+ const int numThreads = 5;
+ FastOS_ThreadPool pool(128*1024);
+ Job jobs[numThreads];
+ int i;
+ FastOS_BoolCond boolcondition;
+ double *timebuf = new double[numThreads * PRI_TIME_COUNT];
+ boolcondition.SetBusy();
+ for(i=0; i<numThreads; i++)
+ {
+ jobs[i].code = PRIORITY_TEST;
+ jobs[i].boolcondition = &boolcondition;
+ jobs[i].timebuf = &timebuf[i*PRI_TIME_COUNT];
+ jobs[i].threadPri = static_cast<FastOS_Thread::Priority>((-2+i)*sign);
+ jobs[i].average = 0;
+ jobs[i].ownThread = pool.NewThread(this,
+ static_cast<void *>(&jobs[i]));
+ bool rc=(jobs[i].ownThread != NULL);
+ Progress(rc, "CreatingThread %d", i+1);
+ }
+ // Start testing
+ Progress(true, "Start testing...");
+ boolcondition.ClearBusyBroadcast();
+ Progress(true, "Waiting for threads to finish using pool.Close()...");
+ pool.Close();
+ Progress(true, "Pool closed.");
+ double firstTime, lastTime, totalTime;
+ firstTime = lastTime = timebuf[0];
+ for(i=0; i<(numThreads * PRI_TIME_COUNT); i++)
+ {
+ double timevalue = timebuf[i];
+ if(timevalue < firstTime)
+ firstTime = timevalue;
+ if(timevalue > lastTime)
+ lastTime = timevalue;
+ int job = (i/PRI_TIME_COUNT);
+ jobs[job].average += timevalue;
+ }
+ totalTime = (lastTime - firstTime);
+ for(i=0; i<numThreads; i++)
+ {
+ const int slen=45;
+ char timestr[slen+1];
+ memset(timestr, ' ', slen);
+ timestr[slen] = '\0';
+ double *p = &timebuf[i*PRI_TIME_COUNT];
+ for(int j=0; j<PRI_TIME_COUNT; j++)
+ {
+ int pos = static_cast<int>
+ (((*p++) - firstTime) * (slen-1) / totalTime);
+ timestr[pos] = '*';
+ }
+ Progress(true, "Pri %2d: %s", jobs[i].threadPri, timestr);
+ }
+ char averagestr[80];
+ averagestr[0] = '\0';
+ for(i=0; i<numThreads; i++)
+ {
+ jobs[i].average /= PRI_TIME_COUNT;
+ // convert average to percent
+ jobs[i].average = (jobs[i].average - firstTime) / totalTime;
+ sprintf(&averagestr[strlen(averagestr)], "%.3f ", jobs[i].average);
+ }
+ Progress(true, "Percentages: %s", averagestr);
+ bool okExecution=true;
+ for(i=0; i<(numThreads-1); i++)
+ {
+ double val1 = jobs[i].average * sign;
+ double val2 = jobs[i+1].average * sign;
+ if(val1 < val2)
+ {
+ okExecution = false;
+ break;
+ }
+ }
+ Progress(okExecution, "Checking order of execution");
+ delete [] timebuf;
+ PrintSeparator();
+ }
+ void ThreadIdTest ()
+ {
+ const int numThreads = 5;
+ int i,j;
+ TestHeader ("Thread Id Test");
+ FastOS_ThreadPool pool(128*1024);
+ Job jobs[numThreads];
+ FastOS_Mutex slowStartMutex;
+ slowStartMutex.Lock(); // Halt all threads until we want them to run
+ for(i=0; i<numThreads; i++) {
+ jobs[i].code = TEST_ID;
+ jobs[i].result = -1;
+ jobs[i]._threadId = 0;
+ jobs[i].mutex = &slowStartMutex;
+ jobs[i].ownThread = pool.NewThread(this,
+ static_cast<void *>(&jobs[i]));
+ bool rc=(jobs[i].ownThread != NULL);
+ if(rc)
+ jobs[i]._threadId = jobs[i].ownThread->GetThreadId();
+ Progress(rc, "CreatingThread %d id:%lu", i+1,
+ (unsigned long)(jobs[i]._threadId));
+ for(j=0; j<i; j++) {
+ if(jobs[j]._threadId == jobs[i]._threadId) {
+ Progress(false,
+ "Two different threads received the "
+ "same thread id (%lu)",
+ (unsigned long)(jobs[i]._threadId));
+ }
+ }
+ }
+ slowStartMutex.Unlock(); // Allow threads to run
+ Progress(true, "Waiting for threads to finish using pool.Close()...");
+ pool.Close();
+ Progress(true, "Pool closed.");
+ for(i=0; i<numThreads; i++) {
+ Progress(jobs[i].result == 1,
+ "Thread %lu: ID comparison (current vs stored)",
+ (unsigned long)(jobs[i]._threadId));
+ }
+ PrintSeparator();
+ }
+ void TimedWaitTest ()
+ {
+ TestHeader("Cond Timed Wait Test");
+ FastOS_ThreadPool pool(128*1024);
+ Job job;
+ FastOS_Cond condition;
+ job.result = -1;
+ job.condition = &condition;
+ job.ownThread = pool.NewThread(this,
+ static_cast<void *>(&job));
+ Progress(job.ownThread !=NULL, "Creating thread");
+ if(job.ownThread != NULL)
+ {
+ condition.Lock();
+ bool gotCond = condition.TimedWait(500);
+ Progress(!gotCond, "We should not get the condition just yet (%s)",
+ gotCond ? "got it" : "didn't get it");
+ gotCond = condition.TimedWait(500);
+ Progress(!gotCond, "We should not get the condition just yet (%s)",
+ gotCond ? "got it" : "didn't get it");
+ gotCond = condition.TimedWait(5000);
+ Progress(gotCond, "We should have got the condition now (%s)",
+ gotCond ? "got it" : "didn't get it");
+ condition.Unlock();
+ }
+ Progress(true, "Waiting for threads to finish using pool.Close()...");
+ pool.Close();
+ Progress(true, "Pool closed.");
+ PrintSeparator();
+ }
+ void TryLockTest ()
+ {
+ TestHeader("Mutex TryLock Test");
+ FastOS_ThreadPool pool(128*1024);
+ Job job;
+ FastOS_Mutex mtx;
+ job.code = HOLD_MUTEX_FOR2SEC;
+ job.result = -1;
+ job.mutex = &mtx;
+ job.ownThread = pool.NewThread(this,
+ static_cast<void *>(&job));
+ Progress(job.ownThread !=NULL, "Creating thread");
+ if(job.ownThread != NULL)
+ {
+ bool lockrc;
+ FastOS_Thread::Sleep(1000);
+ for(int i=0; i<5; i++)
+ {
+ lockrc = mtx.TryLock();
+ Progress(!lockrc, "We should not get the mutex lock just yet (%s)",
+ lockrc ? "got it" : "didn't get it");
+ if(lockrc) {
+ mtx.Unlock();
+ break;
+ }
+ }
+ FastOS_Thread::Sleep(2000);
+ lockrc = mtx.TryLock();
+ Progress(lockrc, "We should get the mutex lock now (%s)",
+ lockrc ? "got it" : "didn't get it");
+ if(lockrc)
+ mtx.Unlock();
+ Progress(true, "Attempting to do normal lock...");
+ mtx.Lock();
+ Progress(true, "Got lock. Attempt to do normal unlock...");
+ mtx.Unlock();
+ Progress(true, "Unlock OK.");
+ }
+ Progress(true, "Waiting for threads to finish using pool.Close()...");
+ pool.Close();
+ Progress(true, "Pool closed.");
+ PrintSeparator();
+ }
+ void ThreadStatsTest ()
+ {
+ int inactiveThreads;
+ int activeThreads;
+ int startedThreads;
+ TestHeader("Thread Statistics Test");
+ FastOS_ThreadPool pool(128*1024);
+ Job job[2];
+ inactiveThreads = pool.GetNumInactiveThreads();
+ Progress(inactiveThreads == 0, "Initial inactive threads = %d",
+ inactiveThreads);
+ activeThreads = pool.GetNumActiveThreads();
+ Progress(activeThreads == 0, "Initial active threads = %d",
+ activeThreads);
+ startedThreads = pool.GetNumStartedThreads();
+ Progress(startedThreads == 0, "Initial started threads = %d",
+ startedThreads);
+ job[0].code = WAIT_FOR_BREAK_FLAG;
+ job[0].ownThread = pool.NewThread(this,
+ static_cast<void *>(&job[0]));
+ FastOS_Thread::Sleep(1000);
+ inactiveThreads = pool.GetNumInactiveThreads();
+ Progress(inactiveThreads == 0, "Inactive threads = %d", inactiveThreads);
+ activeThreads = pool.GetNumActiveThreads();
+ Progress(activeThreads == 1, "Active threads = %d", activeThreads);
+ startedThreads = pool.GetNumStartedThreads();
+ Progress(startedThreads == 1, "Started threads = %d", startedThreads);
+ job[1].code = WAIT_FOR_BREAK_FLAG;
+ job[1].ownThread = pool.NewThread(this,
+ static_cast<void *>(&job[1]));
+ FastOS_Thread::Sleep(1000);
+ inactiveThreads = pool.GetNumInactiveThreads();
+ Progress(inactiveThreads == 0, "Inactive threads = %d", inactiveThreads);
+ activeThreads = pool.GetNumActiveThreads();
+ Progress(activeThreads == 2, "Active threads = %d", activeThreads);
+ startedThreads = pool.GetNumStartedThreads();
+ Progress(startedThreads == 2, "Started threads = %d", startedThreads);
+ Progress(true, "Setting breakflag on threads...");
+ job[0].ownThread->SetBreakFlag();
+ job[1].ownThread->SetBreakFlag();
+ FastOS_Thread::Sleep(3000);
+ inactiveThreads = pool.GetNumInactiveThreads();
+ Progress(inactiveThreads == 2, "Inactive threads = %d", inactiveThreads);
+ activeThreads = pool.GetNumActiveThreads();
+ Progress(activeThreads == 0, "Active threads = %d", activeThreads);
+ startedThreads = pool.GetNumStartedThreads();
+ Progress(startedThreads == 2, "Started threads = %d", startedThreads);
+ Progress(true, "Repeating process in the same pool...");
+ job[0].code = WAIT_FOR_BREAK_FLAG;
+ job[0].ownThread = pool.NewThread(this, static_cast<void *>(&job[0]));
+ FastOS_Thread::Sleep(1000);
+ inactiveThreads = pool.GetNumInactiveThreads();
+ Progress(inactiveThreads == 1, "Inactive threads = %d", inactiveThreads);
+ activeThreads = pool.GetNumActiveThreads();
+ Progress(activeThreads == 1, "Active threads = %d", activeThreads);
+ startedThreads = pool.GetNumStartedThreads();
+ Progress(startedThreads == 3, "Started threads = %d", startedThreads);
+ job[1].code = WAIT_FOR_BREAK_FLAG;
+ job[1].ownThread = pool.NewThread(this, static_cast<void *>(&job[1]));
+ FastOS_Thread::Sleep(1000);
+ inactiveThreads = pool.GetNumInactiveThreads();
+ Progress(inactiveThreads == 0, "Inactive threads = %d", inactiveThreads);
+ activeThreads = pool.GetNumActiveThreads();
+ Progress(activeThreads == 2, "Active threads = %d", activeThreads);
+ startedThreads = pool.GetNumStartedThreads();
+ Progress(startedThreads == 4, "Started threads = %d", startedThreads);
+ Progress(true, "Setting breakflag on threads...");
+ job[0].ownThread->SetBreakFlag();
+ job[1].ownThread->SetBreakFlag();
+ FastOS_Thread::Sleep(3000);
+ inactiveThreads = pool.GetNumInactiveThreads();
+ Progress(inactiveThreads == 2, "Inactive threads = %d", inactiveThreads);
+ activeThreads = pool.GetNumActiveThreads();
+ Progress(activeThreads == 0, "Active threads = %d", activeThreads);
+ startedThreads = pool.GetNumStartedThreads();
+ Progress(startedThreads == 4, "Started threads = %d", startedThreads);
+ pool.Close();
+ Progress(true, "Pool closed.");
+ PrintSeparator();
+ }
+ int Main ();
+ void LeakTest ()
+ {
+ TestHeader("Leak Test");
+ int allocCount = 2 * 1024 * 1024;
+ int progressIndex= allocCount/8;
+ int i;
+ for(i=0; i<allocCount; i++)
+ {
+ FastOS_Mutex *mtx = new FastOS_Mutex();
+ mtx->Lock();
+ mtx->Unlock();
+ delete mtx;
+ if((i % progressIndex) == (progressIndex - 1))
+ Progress(true, "Tested %d FastOS_Mutex instances", i + 1);
+ }
+ for(i=0; i<allocCount; i++)
+ {
+ FastOS_Cond *cond = new FastOS_Cond();
+ delete cond;
+ if((i % progressIndex) == (progressIndex - 1))
+ Progress(true, "Tested %d FastOS_Cond instances", i+1);
+ }
+ for(i=0; i<allocCount; i++)
+ {
+ FastOS_BoolCond *cond = new FastOS_BoolCond();
+ delete cond;
+ if((i % progressIndex) == (progressIndex - 1))
+ Progress(true, "Tested %d FastOS_BoolCond instances", i+1);
+ }
+ PrintSeparator();
+ }
+ void SynchronizationStressTest ()
+ {
+ TestHeader("Synchronization Object Stress Test");
+ const int allocCount = 150000;
+ int i;
+ FastOS_Mutex **mutexes = new FastOS_Mutex*[allocCount];
+ FastOS_Time startTime, nowTime;
+ startTime.SetNow();
+ for(i=0; i<allocCount; i++)
+ mutexes[i] = new FastOS_Mutex();
+ nowTime.SetNow();
+ Progress(true, "Allocated %d mutexes at time: %d ms", allocCount,
+ static_cast<int>(nowTime.MilliSecs() - startTime.MilliSecs()));
+ for(int e=0; e<4; e++)
+ {
+ for(i=0; i<allocCount; i++)
+ mutexes[i]->Lock();
+ for(i=0; i<allocCount; i++)
+ mutexes[i]->Unlock();
+ nowTime.SetNow();
+ Progress(true, "Tested %d mutexes at time: %d ms", allocCount,
+ static_cast<int>(nowTime.MilliSecs() -
+ startTime.MilliSecs()));
+ }
+ for(i=0; i<allocCount; i++)
+ delete mutexes[i];
+ nowTime.SetNow();
+ Progress(true, "Deleted %d mutexes at time: %d ms", allocCount,
+ static_cast<int>(nowTime.MilliSecs() - startTime.MilliSecs()));
+ delete [] mutexes;
+ PrintSeparator();
+ }
+int ThreadTest::Main ()
+ printf("grep for the string '%s' to detect failures.\n\n", failString);
+ // HowManyThreadsTest();
+ SynchronizationStressTest();
+ LeakTest();
+ ThreadStatsTest();
+ TryLockTest();
+ TimedWaitTest();
+ ThreadIdTest();
+ PriorityTest(1);
+ PriorityTest(-1);
+ SignalTest();
+ BroadcastTest();
+ CreateSingleThread();
+ CreateSingleThreadAndJoin();
+ TooManyThreadsTest();
+ MutexTest(false);
+ MutexTest(true);
+ ClosePoolTest();
+ BreakFlagTest();
+ SingleThreadJoinWaitMultipleTest(0);
+ SingleThreadJoinWaitMultipleTest(1);
+ SingleThreadJoinWaitMultipleTest(2);
+ SingleThreadJoinWaitMultipleTest(3);
+ SingleThreadJoinWaitMultipleTest(2);
+ SingleThreadJoinWaitMultipleTest(1);
+ SingleThreadJoinWaitMultipleTest(0);
+ BreakFlagTest();
+ ClosePoolTest();
+ MutexTest(true);
+ MutexTest(false);
+ TooManyThreadsTest();
+ CreateSingleThreadAndJoin();
+ CreateSingleThread();
+ BroadcastTest();
+ SignalTest();
+ ThreadCreatePerformance(false, 500, 100);
+ ClosePoolStability();
+ BounceTest();
+ printf("END OF TEST (%s)\n", _argv[0]);
+ return 0;
+volatile int busyCnt;
+void ThreadTest::Run (FastOS_ThreadInterface *thread, void *arg)
+ if(arg == NULL)
+ return;
+ Job *job = static_cast<Job *>(arg);
+ char someStack[15*1024];
+ memset(someStack, 0, 15*1024);
+ switch(job->code)
+ {
+ {
+ thread->SetPriority(job->threadPri);
+ job->boolcondition->WaitBusy();
+ double *p = job->timebuf;
+ for(int i=0; i<PRI_TIME_COUNT; i++)
+ {
+ for(int j=0; j<8192; j++)
+ {
+ busyCnt = busyCnt*10;
+ }
+ FastOS_Time now;
+ now.SetNow();
+ *p++ = now.MilliSecs();
+ }
+ break;
+ }
+ {
+ job->result = 1;
+ break;
+ }
+ case NOP:
+ {
+ Progress(true, "Doing NOP");
+ job->result = 1;
+ break;
+ }
+ {
+ Progress(true, "Thread printing message: [%s]", job->message);
+ job->result = strlen(job->message);
+ FastOS_Thread::Sleep(3000);
+ break;
+ }
+ {
+ int result;
+ if(job->mutex != NULL)
+ job->mutex->Lock();
+ result = static_cast<int>(number);
+ int sleepOn = (INCREASE_NUMBER_AMOUNT/2) * 321/10000;
+ for(int i=0; i<(INCREASE_NUMBER_AMOUNT/2); i++)
+ {
+ number = number + 2;
+ if(i == sleepOn)
+ FastOS_Thread::Sleep(1000);
+ }
+ if(job->mutex != NULL)
+ job->mutex->Unlock();
+ job->result = result; // This marks the end of the thread
+ break;
+ }
+ {
+ for(;;)
+ {
+ FastOS_Thread::Sleep(1000);
+ if(thread->GetBreakFlag())
+ {
+ Progress(true, "Thread %p got breakflag", thread);
+ break;
+ }
+ }
+ break;
+ }
+ {
+ if(job->mutex)
+ job->mutex->Lock();
+ if(job->otherThread != NULL)
+ job->otherThread->Join();
+ if(job->mutex)
+ job->mutex->Unlock();
+ break;
+ }
+ {
+ job->condition->Lock();
+ job->result = 1;
+ job->condition->Wait();
+ job->condition->Unlock();
+ job->result = 0;
+ break;
+ }
+ {
+ while (!thread->GetBreakFlag()) {
+ job->otherjob->condition->Lock();
+ job->otherjob->bouncewakeupcnt++;
+ job->otherjob->bouncewakeup = true;
+ job->otherjob->condition->Signal();
+ job->otherjob->condition->Unlock();
+ job->condition->Lock();
+ while (!job->bouncewakeup)
+ job->condition->TimedWait(1);
+ job->bouncewakeup = false;
+ job->condition->Unlock();
+ }
+ break;
+ }
+ case TEST_ID:
+ {
+ job->mutex->Lock(); // Initially the parent threads owns the lock
+ job->mutex->Unlock(); // It is unlocked when we should start
+ FastOS_ThreadId currentId = FastOS_Thread::GetCurrentThreadId();
+ if(currentId == job->_threadId)
+ job->result = 1;
+ else
+ job->result = -1;
+ break;
+ }
+ {
+ FastOS_Thread::Sleep(2000);
+ job->condition->Signal();
+ job->result = 1;
+ break;
+ }
+ {
+ job->mutex->Lock();
+ FastOS_Thread::Sleep(2000);
+ job->mutex->Unlock();
+ job->result = 1;
+ break;
+ }
+ case WAIT_2_SEC:
+ {
+ FastOS_Thread::Sleep(2000);
+ job->result = 1;
+ break;
+ }
+ default:
+ Progress(false, "Unknown jobcode");
+ break;
+ }
+int main (int argc, char **argv)
+ ThreadTest app;
+ setvbuf(stdout, NULL, _IOLBF, 8192);
+ return app.Entry(argc, argv);