aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib/src/tests/executor/threadstackexecutor_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'vespalib/src/tests/executor/threadstackexecutor_test.cpp')
-rw-r--r--vespalib/src/tests/executor/threadstackexecutor_test.cpp119
1 files changed, 119 insertions, 0 deletions
diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
new file mode 100644
index 00000000000..0eb08a80f41
--- /dev/null
+++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
@@ -0,0 +1,119 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/vespalib/util/atomic.h>
+
+#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <vespa/vespalib/util/sync.h>
+
+using namespace vespalib;
+
+typedef Executor::Task Task;
+
+struct MyTask : public Executor::Task {
+ Gate &gate;
+ CountDownLatch &latch;
+ static uint32_t runCnt;
+ static uint32_t deleteCnt;
+ MyTask(Gate &g, CountDownLatch &l) : gate(g), latch(l) {}
+ virtual void run() {
+ Atomic::postInc(&runCnt);
+ latch.countDown();
+ gate.await();
+ }
+ virtual ~MyTask() {
+ Atomic::postInc(&deleteCnt);
+ }
+ static void resetStats() {
+ runCnt = 0;
+ deleteCnt = 0;
+ }
+};
+uint32_t MyTask::runCnt = 0;
+uint32_t MyTask::deleteCnt = 0;
+
+struct MyState {
+ Gate gate; // to block workers
+ CountDownLatch latch; // to wait for workers
+ ThreadStackExecutor executor;
+ bool checked;
+ MyState() : gate(), latch(10), executor(10, 128000, 20), checked(false)
+ {
+ MyTask::resetStats();
+ }
+ MyState &execute(uint32_t cnt) {
+ for (uint32_t i = 0; i < cnt; ++i) {
+ executor.execute(Task::UP(new MyTask(gate, latch)));
+ }
+ return *this;
+ }
+ MyState &sync() {
+ executor.sync();
+ return *this;
+ }
+ MyState &shutdown() {
+ executor.shutdown();
+ return *this;
+ }
+ MyState &open() {
+ gate.countDown();
+ return *this;
+ }
+ MyState &wait() {
+ latch.await();
+ return *this;
+ }
+ MyState &check(uint32_t expect_rejected,
+ uint32_t expect_queue,
+ uint32_t expect_running,
+ uint32_t expect_deleted)
+ {
+ ASSERT_TRUE(!checked);
+ checked = true;
+ ThreadStackExecutor::Stats stats = executor.getStats();
+ EXPECT_EQUAL(expect_running + expect_deleted, MyTask::runCnt);
+ EXPECT_EQUAL(expect_rejected + expect_deleted, MyTask::deleteCnt);
+ EXPECT_EQUAL(expect_queue + expect_running + expect_deleted,
+ stats.acceptedTasks);
+ EXPECT_EQUAL(expect_rejected, stats.rejectedTasks);
+ EXPECT_TRUE(!(gate.getCount() == 1) || (expect_deleted == 0));
+ if (expect_deleted == 0) {
+ EXPECT_EQUAL(expect_queue + expect_running, stats.maxPendingTasks);
+ }
+ stats = executor.getStats();
+ EXPECT_EQUAL(expect_queue + expect_running, stats.maxPendingTasks);
+ EXPECT_EQUAL(0u, stats.acceptedTasks);
+ EXPECT_EQUAL(0u, stats.rejectedTasks);
+ return *this;
+ }
+};
+
+
+TEST_F("requireThatTasksAreRunAndDeleted", MyState()) {
+ TEST_DO(f1.open().execute(5).sync().check(0, 0, 0, 5));
+}
+
+TEST_F("requireThatTasksRunConcurrently", MyState()) {
+ TEST_DO(f1.execute(10).wait().check(0, 0, 10, 0).open());
+}
+
+TEST_F("requireThatThreadCountIsRespected", MyState()) {
+ TEST_DO(f1.execute(20).wait().check(0, 10, 10, 0).open());
+}
+
+TEST_F("requireThatExtraTasksAreDropped", MyState()) {
+ TEST_DO(f1.execute(40).wait().check(20, 10, 10, 0).open());
+}
+
+TEST_F("requireThatActiveWorkersDrainInputQueue", MyState()) {
+ TEST_DO(f1.execute(20).wait().open().sync().check(0, 0, 0, 20));
+}
+
+TEST_F("requireThatPendingTasksAreRunAfterShutdown", MyState()) {
+ TEST_DO(f1.execute(20).wait().shutdown().open().sync().check(0, 0, 0, 20));
+}
+
+TEST_F("requireThatNewTasksAreDroppedAfterShutdown", MyState()) {
+ TEST_DO(f1.open().shutdown().execute(5).sync().check(5, 0, 0, 0));
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }