diff options
Diffstat (limited to 'vespalib/src/tests/executor')
-rw-r--r-- | vespalib/src/tests/executor/.cvsignore | 3 | ||||
-rw-r--r-- | vespalib/src/tests/executor/.gitignore | 8 | ||||
-rw-r--r-- | vespalib/src/tests/executor/CMakeLists.txt | 22 | ||||
-rw-r--r-- | vespalib/src/tests/executor/DESC | 1 | ||||
-rw-r--r-- | vespalib/src/tests/executor/FILES | 1 | ||||
-rw-r--r-- | vespalib/src/tests/executor/executor_test.cpp | 43 | ||||
-rw-r--r-- | vespalib/src/tests/executor/stress_test.cpp | 150 | ||||
-rw-r--r-- | vespalib/src/tests/executor/threadstackexecutor_test.cpp | 119 |
8 files changed, 347 insertions, 0 deletions
diff --git a/vespalib/src/tests/executor/.cvsignore b/vespalib/src/tests/executor/.cvsignore new file mode 100644 index 00000000000..fe2e31358ec --- /dev/null +++ b/vespalib/src/tests/executor/.cvsignore @@ -0,0 +1,3 @@ +.depend +Makefile +executor_test diff --git a/vespalib/src/tests/executor/.gitignore b/vespalib/src/tests/executor/.gitignore new file mode 100644 index 00000000000..36ffae1f6ed --- /dev/null +++ b/vespalib/src/tests/executor/.gitignore @@ -0,0 +1,8 @@ +.depend +Makefile +executor_test +stress_test +threadstackexecutor_test +vespalib_executor_test_app +vespalib_stress_test_app +vespalib_threadstackexecutor_test_app diff --git a/vespalib/src/tests/executor/CMakeLists.txt b/vespalib/src/tests/executor/CMakeLists.txt new file mode 100644 index 00000000000..5b5ab2ab169 --- /dev/null +++ b/vespalib/src/tests/executor/CMakeLists.txt @@ -0,0 +1,22 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_threadstackexecutor_test_app + SOURCES + threadstackexecutor_test.cpp + DEPENDS + vespalib +) +vespa_add_test(NAME vespalib_threadstackexecutor_test_app COMMAND vespalib_threadstackexecutor_test_app) +vespa_add_executable(vespalib_executor_test_app + SOURCES + executor_test.cpp + DEPENDS + vespalib +) +vespa_add_test(NAME vespalib_executor_test_app COMMAND vespalib_executor_test_app) +vespa_add_executable(vespalib_stress_test_app + SOURCES + stress_test.cpp + DEPENDS + vespalib +) +vespa_add_test(NAME vespalib_stress_test_app COMMAND vespalib_stress_test_app BENCHMARK) diff --git a/vespalib/src/tests/executor/DESC b/vespalib/src/tests/executor/DESC new file mode 100644 index 00000000000..814694d4c20 --- /dev/null +++ b/vespalib/src/tests/executor/DESC @@ -0,0 +1 @@ +Tests code used to run tasks concurrently in multiple threads. diff --git a/vespalib/src/tests/executor/FILES b/vespalib/src/tests/executor/FILES new file mode 100644 index 00000000000..0a6c8c0a73d --- /dev/null +++ b/vespalib/src/tests/executor/FILES @@ -0,0 +1 @@ +threadstackexecutor_test.cpp diff --git a/vespalib/src/tests/executor/executor_test.cpp b/vespalib/src/tests/executor/executor_test.cpp new file mode 100644 index 00000000000..edb352c797c --- /dev/null +++ b/vespalib/src/tests/executor/executor_test.cpp @@ -0,0 +1,43 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Unit tests for executor. + +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("executor_test"); + +#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/vespalib/util/closuretask.h> + +using namespace vespalib; + +namespace { + +class Test : public vespalib::TestApp { + void requireThatClosuresCanBeWrappedInATask(); + +public: + int Main(); +}; + +int +Test::Main() +{ + TEST_INIT("executor_test"); + + TEST_DO(requireThatClosuresCanBeWrappedInATask()); + + TEST_DONE(); +} + +void setBool(bool *b) { *b = true; } +void Test::requireThatClosuresCanBeWrappedInATask() { + bool called = false; + Executor::Task::UP task = makeTask(makeClosure(setBool, &called)); + EXPECT_TRUE(!called); + task->run(); + EXPECT_TRUE(called); +} + +} // namespace + +TEST_APPHOOK(Test); diff --git a/vespalib/src/tests/executor/stress_test.cpp b/vespalib/src/tests/executor/stress_test.cpp new file mode 100644 index 00000000000..bf130dcb9dc --- /dev/null +++ b/vespalib/src/tests/executor/stress_test.cpp @@ -0,0 +1,150 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("executor_test"); +#include <vespa/vespalib/testkit/testapp.h> + +#include <vespa/vespalib/util/executor.h> +#include <vespa/vespalib/util/sync.h> +#include <vespa/vespalib/util/threadstackexecutor.h> + +#include <math.h> + +using namespace vespalib; + +uint32_t doStuff(uint32_t input) { + char buf[128]; + for (uint32_t i = 0; i < sizeof(buf); ++i) { + buf[i] = ((input + i) * i) & 0xff; + } + uint32_t result = 0; + for (uint32_t i = 0; i < sizeof(buf); ++i) { + result += ((buf[i] * i) + input) & 0xff; + } + return result; +} + +struct CPUTask : public Executor::Task { + uint32_t taskSize; + uint32_t &result; + CPUTask(uint32_t size, uint32_t &res) : taskSize(size), result(res) {} + virtual void run() { + uint32_t res = 0; + for (uint32_t i = 0; i < taskSize; ++i) { + res += doStuff(i); + } + result += res; + } +}; + +struct SyncTask : public Executor::Task { + Gate &gate; + CountDownLatch &latch; + SyncTask(Gate &g, CountDownLatch &l) : gate(g), latch(l) {} + virtual void run() { + latch.countDown(); + gate.await(); + } +}; + +class Test : public TestApp +{ +private: + uint32_t _result; + +public: + Test() : _result(0) {} + uint32_t calibrate(double ms); + void stress(Executor &executor, uint32_t taskSize, uint32_t numTasks); + int Main(); +}; + +uint32_t +Test::calibrate(double wanted_ms) +{ + uint32_t n = 0; + FastOS_Time t0; + FastOS_Time t1; + { // calibration of calibration loop + uint32_t result = 0; + t0.SetNow(); + double ms; + do { + result += doStuff(++n); + t1.SetNow(); + ms = (t1.MilliSecs() - t0.MilliSecs()); + } while (ms < 1000.0); + _result += result; + } + { // calibrate loop + t0.SetNow(); + uint32_t result = 0; + for (uint32_t i = 0; i < n; ++i) { + result += doStuff(i); + } + _result += result; + t1.SetNow(); + } + double ms = (t1.MilliSecs() - t0.MilliSecs()); + double size = (((double)n) / ms) * wanted_ms; + return (uint32_t) round(size); +} + +int +Test::Main() +{ + TEST_INIT("stress_test"); + if (_argc != 4) { + fprintf(stderr, "Usage: %s <threads> <ms per task> <tasks>\n", + _argv[0]); + TEST_DONE(); + } + uint32_t threads = atoi(_argv[1]); + double ms_per_task = strtod(_argv[2], 0); + uint32_t tasks = atoi(_argv[3]); + fprintf(stderr, "threads : %u\n", threads); + fprintf(stderr, "ms per task: %g\n", ms_per_task); + fprintf(stderr, "tasks : %u\n", tasks); + { + fprintf(stderr, "calibrating task size...\n"); + uint32_t taskSize = calibrate(ms_per_task); + fprintf(stderr, "calibrated task size: %u\n", taskSize); + ThreadStackExecutor executor(threads, 128000, 5000 + threads); + { + Gate gate; + CountDownLatch latch(threads); + for (uint32_t i = 0; i < threads; ++i) { + Executor::Task::UP res + = executor.execute(Executor::Task::UP( + new SyncTask(gate, latch))); + ASSERT_TRUE(res.get() == 0); + } + latch.await(); + gate.countDown(); + executor.sync(); + fprintf(stderr, "all threads have been accounted for...\n"); + } + { + FastOS_Time t0; + FastOS_Time t1; + fprintf(stderr, "starting task submission...\n"); + t0.SetNow(); + uint32_t result = 0; + for (uint32_t i = 0; i < tasks; ++i) { + Executor::Task::UP t(new CPUTask(taskSize, result)); + t = executor.execute(std::move(t)); + while (t.get() != 0) { + FastOS_Thread::Sleep(10); + t = executor.execute(std::move(t)); + } + } + executor.sync(); + t1.SetNow(); + double ms = (t1.MilliSecs() - t0.MilliSecs()); + fprintf(stderr, "total execution wall time: %g ms\n", ms); + _result += result; + } + } + TEST_DONE(); +} +TEST_APPHOOK(Test); diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp new file mode 100644 index 00000000000..0eb08a80f41 --- /dev/null +++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp @@ -0,0 +1,119 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/vespalib/util/atomic.h> + +#include <vespa/vespalib/util/threadstackexecutor.h> +#include <vespa/vespalib/util/sync.h> + +using namespace vespalib; + +typedef Executor::Task Task; + +struct MyTask : public Executor::Task { + Gate &gate; + CountDownLatch &latch; + static uint32_t runCnt; + static uint32_t deleteCnt; + MyTask(Gate &g, CountDownLatch &l) : gate(g), latch(l) {} + virtual void run() { + Atomic::postInc(&runCnt); + latch.countDown(); + gate.await(); + } + virtual ~MyTask() { + Atomic::postInc(&deleteCnt); + } + static void resetStats() { + runCnt = 0; + deleteCnt = 0; + } +}; +uint32_t MyTask::runCnt = 0; +uint32_t MyTask::deleteCnt = 0; + +struct MyState { + Gate gate; // to block workers + CountDownLatch latch; // to wait for workers + ThreadStackExecutor executor; + bool checked; + MyState() : gate(), latch(10), executor(10, 128000, 20), checked(false) + { + MyTask::resetStats(); + } + MyState &execute(uint32_t cnt) { + for (uint32_t i = 0; i < cnt; ++i) { + executor.execute(Task::UP(new MyTask(gate, latch))); + } + return *this; + } + MyState &sync() { + executor.sync(); + return *this; + } + MyState &shutdown() { + executor.shutdown(); + return *this; + } + MyState &open() { + gate.countDown(); + return *this; + } + MyState &wait() { + latch.await(); + return *this; + } + MyState &check(uint32_t expect_rejected, + uint32_t expect_queue, + uint32_t expect_running, + uint32_t expect_deleted) + { + ASSERT_TRUE(!checked); + checked = true; + ThreadStackExecutor::Stats stats = executor.getStats(); + EXPECT_EQUAL(expect_running + expect_deleted, MyTask::runCnt); + EXPECT_EQUAL(expect_rejected + expect_deleted, MyTask::deleteCnt); + EXPECT_EQUAL(expect_queue + expect_running + expect_deleted, + stats.acceptedTasks); + EXPECT_EQUAL(expect_rejected, stats.rejectedTasks); + EXPECT_TRUE(!(gate.getCount() == 1) || (expect_deleted == 0)); + if (expect_deleted == 0) { + EXPECT_EQUAL(expect_queue + expect_running, stats.maxPendingTasks); + } + stats = executor.getStats(); + EXPECT_EQUAL(expect_queue + expect_running, stats.maxPendingTasks); + EXPECT_EQUAL(0u, stats.acceptedTasks); + EXPECT_EQUAL(0u, stats.rejectedTasks); + return *this; + } +}; + + +TEST_F("requireThatTasksAreRunAndDeleted", MyState()) { + TEST_DO(f1.open().execute(5).sync().check(0, 0, 0, 5)); +} + +TEST_F("requireThatTasksRunConcurrently", MyState()) { + TEST_DO(f1.execute(10).wait().check(0, 0, 10, 0).open()); +} + +TEST_F("requireThatThreadCountIsRespected", MyState()) { + TEST_DO(f1.execute(20).wait().check(0, 10, 10, 0).open()); +} + +TEST_F("requireThatExtraTasksAreDropped", MyState()) { + TEST_DO(f1.execute(40).wait().check(20, 10, 10, 0).open()); +} + +TEST_F("requireThatActiveWorkersDrainInputQueue", MyState()) { + TEST_DO(f1.execute(20).wait().open().sync().check(0, 0, 0, 20)); +} + +TEST_F("requireThatPendingTasksAreRunAfterShutdown", MyState()) { + TEST_DO(f1.execute(20).wait().shutdown().open().sync().check(0, 0, 0, 20)); +} + +TEST_F("requireThatNewTasksAreDroppedAfterShutdown", MyState()) { + TEST_DO(f1.open().shutdown().execute(5).sync().check(5, 0, 0, 0)); +} + +TEST_MAIN() { TEST_RUN_ALL(); } |