diff options
author | Geir Storli <geirst@yahoo-inc.com> | 2016-07-07 15:40:13 +0200 |
---|---|---|
committer | Geir Storli <geirst@yahoo-inc.com> | 2016-07-07 15:40:13 +0200 |
commit | 691da61ea995fd056d2725c98feee0872324cf31 (patch) | |
tree | fc6e836fd7975f928957c8f51cf94626a88eb562 /vespalib | |
parent | 49d64d8a06c93564d520a17e6eea10852229bc9e (diff) |
Add support for setting new task limit on BlockingThreadStackExecutor.
Diffstat (limited to 'vespalib')
5 files changed, 67 insertions, 21 deletions
diff --git a/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp b/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp index 297e53cca8d..f8136fe2a10 100644 --- a/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp +++ b/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp @@ -8,6 +8,8 @@ using namespace vespalib; +constexpr int msWait = 30000; + class MyTask : public Executor::Task { private: @@ -19,19 +21,17 @@ public: : _entryGate(entryGate), _exitLatch(exitLatch) {} - virtual void run() override { - _entryGate.await(30000); + _entryGate.await(msWait); _exitLatch.countDown(); } - static Task::UP create(Gate &entryGate, CountDownLatch &exitLatch) { return std::make_unique<MyTask>(entryGate, exitLatch); } }; void -addTaskToExecutor(BlockingThreadStackExecutor *executor, Gate *workersEntryGate, CountDownLatch *workersExitLatch, Gate *exitGate) +blockedExecute(BlockingThreadStackExecutor *executor, Gate *workersEntryGate, CountDownLatch *workersExitLatch, Gate *exitGate) { executor->execute(MyTask::create(*workersEntryGate, *workersExitLatch)); // this should be a blocking call exitGate->countDown(); @@ -44,47 +44,68 @@ struct Fixture BlockingThreadStackExecutor executor; Gate workersEntryGate; CountDownLatch workersExitLatch; - Gate threadExitGate; + Gate blockedExecuteGate; Fixture(uint32_t taskLimit, uint32_t tasksToWaitFor) : executor(1, 128000, taskLimit), workersEntryGate(), workersExitLatch(tasksToWaitFor), - threadExitGate() + blockedExecuteGate() {} void execute(size_t numTasks) { for (size_t i = 0; i < numTasks; ++i) { executor.execute(MyTask::create(workersEntryGate, workersExitLatch)); } } + void updateTaskLimit(uint32_t taskLimit) { + executor.setTaskLimit(taskLimit); + } void openForWorkers() { workersEntryGate.countDown(); } void waitForWorkers() { - workersExitLatch.await(30000); + workersExitLatch.await(msWait); + } + void assertExecuteIsBlocked() { + blockedExecuteGate.await(10); + EXPECT_EQUAL(1u, blockedExecuteGate.getCount()); } - void assertAddTaskThreadIsBlocked() { - threadExitGate.await(10); - EXPECT_EQUAL(1u, threadExitGate.getCount()); + void waitForExecuteIsFinished() { + blockedExecuteGate.await(msWait); + EXPECT_EQUAL(0u, blockedExecuteGate.getCount()); } - void assertAddTaskThreadIsFinished() { - threadExitGate.await(30000); - EXPECT_EQUAL(0u, threadExitGate.getCount()); + ThreadUP blockedExecuteThread() { + return std::make_unique<std::thread>(blockedExecute, &executor, &workersEntryGate, &workersExitLatch, &blockedExecuteGate); } - ThreadUP createAddTaskThread() { - return std::make_unique<std::thread>(addTaskToExecutor, &executor, &workersEntryGate, &workersExitLatch, &threadExitGate); + void blockedExecuteAndWaitUntilFinished() { + ThreadUP thread = blockedExecuteThread(); + TEST_DO(assertExecuteIsBlocked()); + openForWorkers(); + TEST_DO(waitForExecuteIsFinished()); + thread->join(); + waitForWorkers(); } }; TEST_F("require that execute() blocks when task limits is reached", Fixture(3, 4)) { f.execute(3); - ThreadUP thread = f.createAddTaskThread(); - TEST_DO(f.assertAddTaskThreadIsBlocked()); - f.openForWorkers(); - TEST_DO(f.assertAddTaskThreadIsFinished()); - thread->join(); - f.waitForWorkers(); + f.blockedExecuteAndWaitUntilFinished(); +} + +TEST_F("require that task limit can be increased", Fixture(3, 5)) +{ + f.execute(3); + f.updateTaskLimit(4); + f.execute(1); + f.blockedExecuteAndWaitUntilFinished(); +} + +TEST_F("require that task limit can be decreased", Fixture(3, 3)) +{ + f.execute(2); + f.updateTaskLimit(2); + f.blockedExecuteAndWaitUntilFinished(); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp index 4df04d15ce4..95d0b147707 100644 --- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp +++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp @@ -31,4 +31,10 @@ BlockingThreadStackExecutor::wakeup(MonitorGuard & monitor) monitor.broadcast(); } +void +BlockingThreadStackExecutor::setTaskLimit(uint32_t taskLimit) +{ + ThreadStackExecutorBase::setTaskLimit(taskLimit); +} + } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h index 5fba2c7dbe6..6e1b7ad8100 100644 --- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h +++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h @@ -27,6 +27,11 @@ public: **/ BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit); ~BlockingThreadStackExecutor(); + + /** + * Sets a new upper limit for accepted number of tasks. + */ + void setTaskLimit(uint32_t taskLimit); }; } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index 780f0ed5dd0..816c1feaf1b 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -129,6 +129,13 @@ ThreadStackExecutorBase::start(uint32_t threads) } } +void +ThreadStackExecutorBase::setTaskLimit(uint32_t taskLimit) +{ + MonitorGuard monitor(_monitor); + _taskLimit = taskLimit; +} + ThreadStackExecutorBase::Stats ThreadStackExecutorBase::getStats() { diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index 215a8377fc6..b5ab46ed335 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -139,6 +139,7 @@ protected: * @param taskLimit upper limit on accepted tasks **/ ThreadStackExecutorBase(uint32_t stackSize, uint32_t taskLimit); + /** * This will start the theads. This is to avoid starting tasks in * constructor of base class. @@ -146,6 +147,12 @@ protected: * @param threads number of worker threads (concurrent tasks) */ void start(uint32_t threads); + + /** + * Sets a new upper limit for accepted number of tasks. + */ + void setTaskLimit(uint32_t taskLimit); + public: /** * Observe and reset stats for this object. |