summaryrefslogtreecommitdiffstats
path: root/vespalib/src/tests/executor
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /vespalib/src/tests/executor
Publish
Diffstat (limited to 'vespalib/src/tests/executor')
-rw-r--r--vespalib/src/tests/executor/.cvsignore3
-rw-r--r--vespalib/src/tests/executor/.gitignore8
-rw-r--r--vespalib/src/tests/executor/CMakeLists.txt22
-rw-r--r--vespalib/src/tests/executor/DESC1
-rw-r--r--vespalib/src/tests/executor/FILES1
-rw-r--r--vespalib/src/tests/executor/executor_test.cpp43
-rw-r--r--vespalib/src/tests/executor/stress_test.cpp150
-rw-r--r--vespalib/src/tests/executor/threadstackexecutor_test.cpp119
8 files changed, 347 insertions, 0 deletions
diff --git a/vespalib/src/tests/executor/.cvsignore b/vespalib/src/tests/executor/.cvsignore
new file mode 100644
index 00000000000..fe2e31358ec
--- /dev/null
+++ b/vespalib/src/tests/executor/.cvsignore
@@ -0,0 +1,3 @@
+.depend
+Makefile
+executor_test
diff --git a/vespalib/src/tests/executor/.gitignore b/vespalib/src/tests/executor/.gitignore
new file mode 100644
index 00000000000..36ffae1f6ed
--- /dev/null
+++ b/vespalib/src/tests/executor/.gitignore
@@ -0,0 +1,8 @@
+.depend
+Makefile
+executor_test
+stress_test
+threadstackexecutor_test
+vespalib_executor_test_app
+vespalib_stress_test_app
+vespalib_threadstackexecutor_test_app
diff --git a/vespalib/src/tests/executor/CMakeLists.txt b/vespalib/src/tests/executor/CMakeLists.txt
new file mode 100644
index 00000000000..5b5ab2ab169
--- /dev/null
+++ b/vespalib/src/tests/executor/CMakeLists.txt
@@ -0,0 +1,22 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(vespalib_threadstackexecutor_test_app
+ SOURCES
+ threadstackexecutor_test.cpp
+ DEPENDS
+ vespalib
+)
+vespa_add_test(NAME vespalib_threadstackexecutor_test_app COMMAND vespalib_threadstackexecutor_test_app)
+vespa_add_executable(vespalib_executor_test_app
+ SOURCES
+ executor_test.cpp
+ DEPENDS
+ vespalib
+)
+vespa_add_test(NAME vespalib_executor_test_app COMMAND vespalib_executor_test_app)
+vespa_add_executable(vespalib_stress_test_app
+ SOURCES
+ stress_test.cpp
+ DEPENDS
+ vespalib
+)
+vespa_add_test(NAME vespalib_stress_test_app COMMAND vespalib_stress_test_app BENCHMARK)
diff --git a/vespalib/src/tests/executor/DESC b/vespalib/src/tests/executor/DESC
new file mode 100644
index 00000000000..814694d4c20
--- /dev/null
+++ b/vespalib/src/tests/executor/DESC
@@ -0,0 +1 @@
+Tests code used to run tasks concurrently in multiple threads.
diff --git a/vespalib/src/tests/executor/FILES b/vespalib/src/tests/executor/FILES
new file mode 100644
index 00000000000..0a6c8c0a73d
--- /dev/null
+++ b/vespalib/src/tests/executor/FILES
@@ -0,0 +1 @@
+threadstackexecutor_test.cpp
diff --git a/vespalib/src/tests/executor/executor_test.cpp b/vespalib/src/tests/executor/executor_test.cpp
new file mode 100644
index 00000000000..edb352c797c
--- /dev/null
+++ b/vespalib/src/tests/executor/executor_test.cpp
@@ -0,0 +1,43 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Unit tests for executor.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP("executor_test");
+
+#include <vespa/vespalib/testkit/testapp.h>
+#include <vespa/vespalib/util/closuretask.h>
+
+using namespace vespalib;
+
+namespace {
+
+class Test : public vespalib::TestApp {
+ void requireThatClosuresCanBeWrappedInATask();
+
+public:
+ int Main();
+};
+
+int
+Test::Main()
+{
+ TEST_INIT("executor_test");
+
+ TEST_DO(requireThatClosuresCanBeWrappedInATask());
+
+ TEST_DONE();
+}
+
+void setBool(bool *b) { *b = true; }
+void Test::requireThatClosuresCanBeWrappedInATask() {
+ bool called = false;
+ Executor::Task::UP task = makeTask(makeClosure(setBool, &called));
+ EXPECT_TRUE(!called);
+ task->run();
+ EXPECT_TRUE(called);
+}
+
+} // namespace
+
+TEST_APPHOOK(Test);
diff --git a/vespalib/src/tests/executor/stress_test.cpp b/vespalib/src/tests/executor/stress_test.cpp
new file mode 100644
index 00000000000..bf130dcb9dc
--- /dev/null
+++ b/vespalib/src/tests/executor/stress_test.cpp
@@ -0,0 +1,150 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP("executor_test");
+#include <vespa/vespalib/testkit/testapp.h>
+
+#include <vespa/vespalib/util/executor.h>
+#include <vespa/vespalib/util/sync.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
+
+#include <math.h>
+
+using namespace vespalib;
+
+uint32_t doStuff(uint32_t input) {
+ char buf[128];
+ for (uint32_t i = 0; i < sizeof(buf); ++i) {
+ buf[i] = ((input + i) * i) & 0xff;
+ }
+ uint32_t result = 0;
+ for (uint32_t i = 0; i < sizeof(buf); ++i) {
+ result += ((buf[i] * i) + input) & 0xff;
+ }
+ return result;
+}
+
+struct CPUTask : public Executor::Task {
+ uint32_t taskSize;
+ uint32_t &result;
+ CPUTask(uint32_t size, uint32_t &res) : taskSize(size), result(res) {}
+ virtual void run() {
+ uint32_t res = 0;
+ for (uint32_t i = 0; i < taskSize; ++i) {
+ res += doStuff(i);
+ }
+ result += res;
+ }
+};
+
+struct SyncTask : public Executor::Task {
+ Gate &gate;
+ CountDownLatch &latch;
+ SyncTask(Gate &g, CountDownLatch &l) : gate(g), latch(l) {}
+ virtual void run() {
+ latch.countDown();
+ gate.await();
+ }
+};
+
+class Test : public TestApp
+{
+private:
+ uint32_t _result;
+
+public:
+ Test() : _result(0) {}
+ uint32_t calibrate(double ms);
+ void stress(Executor &executor, uint32_t taskSize, uint32_t numTasks);
+ int Main();
+};
+
+uint32_t
+Test::calibrate(double wanted_ms)
+{
+ uint32_t n = 0;
+ FastOS_Time t0;
+ FastOS_Time t1;
+ { // calibration of calibration loop
+ uint32_t result = 0;
+ t0.SetNow();
+ double ms;
+ do {
+ result += doStuff(++n);
+ t1.SetNow();
+ ms = (t1.MilliSecs() - t0.MilliSecs());
+ } while (ms < 1000.0);
+ _result += result;
+ }
+ { // calibrate loop
+ t0.SetNow();
+ uint32_t result = 0;
+ for (uint32_t i = 0; i < n; ++i) {
+ result += doStuff(i);
+ }
+ _result += result;
+ t1.SetNow();
+ }
+ double ms = (t1.MilliSecs() - t0.MilliSecs());
+ double size = (((double)n) / ms) * wanted_ms;
+ return (uint32_t) round(size);
+}
+
+int
+Test::Main()
+{
+ TEST_INIT("stress_test");
+ if (_argc != 4) {
+ fprintf(stderr, "Usage: %s <threads> <ms per task> <tasks>\n",
+ _argv[0]);
+ TEST_DONE();
+ }
+ uint32_t threads = atoi(_argv[1]);
+ double ms_per_task = strtod(_argv[2], 0);
+ uint32_t tasks = atoi(_argv[3]);
+ fprintf(stderr, "threads : %u\n", threads);
+ fprintf(stderr, "ms per task: %g\n", ms_per_task);
+ fprintf(stderr, "tasks : %u\n", tasks);
+ {
+ fprintf(stderr, "calibrating task size...\n");
+ uint32_t taskSize = calibrate(ms_per_task);
+ fprintf(stderr, "calibrated task size: %u\n", taskSize);
+ ThreadStackExecutor executor(threads, 128000, 5000 + threads);
+ {
+ Gate gate;
+ CountDownLatch latch(threads);
+ for (uint32_t i = 0; i < threads; ++i) {
+ Executor::Task::UP res
+ = executor.execute(Executor::Task::UP(
+ new SyncTask(gate, latch)));
+ ASSERT_TRUE(res.get() == 0);
+ }
+ latch.await();
+ gate.countDown();
+ executor.sync();
+ fprintf(stderr, "all threads have been accounted for...\n");
+ }
+ {
+ FastOS_Time t0;
+ FastOS_Time t1;
+ fprintf(stderr, "starting task submission...\n");
+ t0.SetNow();
+ uint32_t result = 0;
+ for (uint32_t i = 0; i < tasks; ++i) {
+ Executor::Task::UP t(new CPUTask(taskSize, result));
+ t = executor.execute(std::move(t));
+ while (t.get() != 0) {
+ FastOS_Thread::Sleep(10);
+ t = executor.execute(std::move(t));
+ }
+ }
+ executor.sync();
+ t1.SetNow();
+ double ms = (t1.MilliSecs() - t0.MilliSecs());
+ fprintf(stderr, "total execution wall time: %g ms\n", ms);
+ _result += result;
+ }
+ }
+ TEST_DONE();
+}
+TEST_APPHOOK(Test);
diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
new file mode 100644
index 00000000000..0eb08a80f41
--- /dev/null
+++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
@@ -0,0 +1,119 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/vespalib/util/atomic.h>
+
+#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <vespa/vespalib/util/sync.h>
+
+using namespace vespalib;
+
+typedef Executor::Task Task;
+
+struct MyTask : public Executor::Task {
+ Gate &gate;
+ CountDownLatch &latch;
+ static uint32_t runCnt;
+ static uint32_t deleteCnt;
+ MyTask(Gate &g, CountDownLatch &l) : gate(g), latch(l) {}
+ virtual void run() {
+ Atomic::postInc(&runCnt);
+ latch.countDown();
+ gate.await();
+ }
+ virtual ~MyTask() {
+ Atomic::postInc(&deleteCnt);
+ }
+ static void resetStats() {
+ runCnt = 0;
+ deleteCnt = 0;
+ }
+};
+uint32_t MyTask::runCnt = 0;
+uint32_t MyTask::deleteCnt = 0;
+
+struct MyState {
+ Gate gate; // to block workers
+ CountDownLatch latch; // to wait for workers
+ ThreadStackExecutor executor;
+ bool checked;
+ MyState() : gate(), latch(10), executor(10, 128000, 20), checked(false)
+ {
+ MyTask::resetStats();
+ }
+ MyState &execute(uint32_t cnt) {
+ for (uint32_t i = 0; i < cnt; ++i) {
+ executor.execute(Task::UP(new MyTask(gate, latch)));
+ }
+ return *this;
+ }
+ MyState &sync() {
+ executor.sync();
+ return *this;
+ }
+ MyState &shutdown() {
+ executor.shutdown();
+ return *this;
+ }
+ MyState &open() {
+ gate.countDown();
+ return *this;
+ }
+ MyState &wait() {
+ latch.await();
+ return *this;
+ }
+ MyState &check(uint32_t expect_rejected,
+ uint32_t expect_queue,
+ uint32_t expect_running,
+ uint32_t expect_deleted)
+ {
+ ASSERT_TRUE(!checked);
+ checked = true;
+ ThreadStackExecutor::Stats stats = executor.getStats();
+ EXPECT_EQUAL(expect_running + expect_deleted, MyTask::runCnt);
+ EXPECT_EQUAL(expect_rejected + expect_deleted, MyTask::deleteCnt);
+ EXPECT_EQUAL(expect_queue + expect_running + expect_deleted,
+ stats.acceptedTasks);
+ EXPECT_EQUAL(expect_rejected, stats.rejectedTasks);
+ EXPECT_TRUE(!(gate.getCount() == 1) || (expect_deleted == 0));
+ if (expect_deleted == 0) {
+ EXPECT_EQUAL(expect_queue + expect_running, stats.maxPendingTasks);
+ }
+ stats = executor.getStats();
+ EXPECT_EQUAL(expect_queue + expect_running, stats.maxPendingTasks);
+ EXPECT_EQUAL(0u, stats.acceptedTasks);
+ EXPECT_EQUAL(0u, stats.rejectedTasks);
+ return *this;
+ }
+};
+
+
+TEST_F("requireThatTasksAreRunAndDeleted", MyState()) {
+ TEST_DO(f1.open().execute(5).sync().check(0, 0, 0, 5));
+}
+
+TEST_F("requireThatTasksRunConcurrently", MyState()) {
+ TEST_DO(f1.execute(10).wait().check(0, 0, 10, 0).open());
+}
+
+TEST_F("requireThatThreadCountIsRespected", MyState()) {
+ TEST_DO(f1.execute(20).wait().check(0, 10, 10, 0).open());
+}
+
+TEST_F("requireThatExtraTasksAreDropped", MyState()) {
+ TEST_DO(f1.execute(40).wait().check(20, 10, 10, 0).open());
+}
+
+TEST_F("requireThatActiveWorkersDrainInputQueue", MyState()) {
+ TEST_DO(f1.execute(20).wait().open().sync().check(0, 0, 0, 20));
+}
+
+TEST_F("requireThatPendingTasksAreRunAfterShutdown", MyState()) {
+ TEST_DO(f1.execute(20).wait().shutdown().open().sync().check(0, 0, 0, 20));
+}
+
+TEST_F("requireThatNewTasksAreDroppedAfterShutdown", MyState()) {
+ TEST_DO(f1.open().shutdown().execute(5).sync().check(5, 0, 0, 0));
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }