summaryrefslogtreecommitdiffstats
path: root/vespalib/src/tests/executor
diff options
context:
space:
mode:
authorHaavard <havardpe@yahoo-inc.com>2016-06-22 12:05:14 +0000
committerHaavard <havardpe@yahoo-inc.com>2016-06-22 12:09:36 +0000
commite708a6f0629eba47a76e722377ddfc305e0b5900 (patch)
tree7a901b7410b3c36079257058b3b6f9bc213c5763 /vespalib/src/tests/executor
parent4b1530f59452e5896a11ac390202dc0f38bbfb1f (diff)
enable waiting until there are few enough active tasks
Enables task throttling without making the scheduler itself blocking
Diffstat (limited to 'vespalib/src/tests/executor')
-rw-r--r--vespalib/src/tests/executor/threadstackexecutor_test.cpp41
1 files changed, 41 insertions, 0 deletions
diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
index 0eb08a80f41..5deab82b6fc 100644
--- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp
+++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
@@ -116,4 +116,45 @@ TEST_F("requireThatNewTasksAreDroppedAfterShutdown", MyState()) {
TEST_DO(f1.open().shutdown().execute(5).sync().check(5, 0, 0, 0));
}
+
+struct WaitTask : public Executor::Task {
+ Gate &gate;
+ WaitTask(Gate &g) : gate(g) {}
+ virtual void run() { gate.await(); }
+};
+
+struct WaitState {
+ ThreadStackExecutor executor;
+ std::vector<Gate> block_task;
+ std::vector<Gate> wait_done;
+ WaitState(size_t num_threads)
+ : executor(num_threads / 2, 128000), block_task(num_threads - 2), wait_done(num_threads - 1)
+ {
+ for (auto &gate: block_task) {
+ auto result = executor.execute(std::make_unique<WaitTask>(gate));
+ ASSERT_TRUE(result.get() == nullptr);
+ }
+ }
+ void wait(size_t count) {
+ executor.wait_for_task_count(count);
+ wait_done[count].countDown();
+ }
+};
+
+TEST_MT_F("require that threads can wait for a specific task count", 7, WaitState(num_threads)) {
+ if (thread_id == 0) {
+ for (size_t next_done = (num_threads - 2); next_done-- > 0;) {
+ if (next_done < f1.block_task.size()) {
+ f1.block_task[f1.block_task.size() - 1 - next_done].countDown();
+ }
+ EXPECT_TRUE(f1.wait_done[next_done].await(25000));
+ for (size_t i = 0; i < next_done; ++i) {
+ EXPECT_TRUE(!f1.wait_done[i].await(20));
+ }
+ }
+ } else {
+ f1.wait(thread_id - 1);
+ }
+}
+
TEST_MAIN() { TEST_RUN_ALL(); }