summaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorGeir Storli <geirst@yahoo-inc.com>2016-07-07 15:40:13 +0200
committerGeir Storli <geirst@yahoo-inc.com>2016-07-07 15:40:13 +0200
commit691da61ea995fd056d2725c98feee0872324cf31 (patch)
treefc6e836fd7975f928957c8f51cf94626a88eb562 /vespalib
parent49d64d8a06c93564d520a17e6eea10852229bc9e (diff)
Add support for setting new task limit on BlockingThreadStackExecutor.
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp63
-rw-r--r--vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp6
-rw-r--r--vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h5
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp7
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h7
5 files changed, 67 insertions, 21 deletions
diff --git a/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp b/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp
index 297e53cca8d..f8136fe2a10 100644
--- a/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp
+++ b/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp
@@ -8,6 +8,8 @@
using namespace vespalib;
+constexpr int msWait = 30000;
+
class MyTask : public Executor::Task
{
private:
@@ -19,19 +21,17 @@ public:
: _entryGate(entryGate),
_exitLatch(exitLatch)
{}
-
virtual void run() override {
- _entryGate.await(30000);
+ _entryGate.await(msWait);
_exitLatch.countDown();
}
-
static Task::UP create(Gate &entryGate, CountDownLatch &exitLatch) {
return std::make_unique<MyTask>(entryGate, exitLatch);
}
};
void
-addTaskToExecutor(BlockingThreadStackExecutor *executor, Gate *workersEntryGate, CountDownLatch *workersExitLatch, Gate *exitGate)
+blockedExecute(BlockingThreadStackExecutor *executor, Gate *workersEntryGate, CountDownLatch *workersExitLatch, Gate *exitGate)
{
executor->execute(MyTask::create(*workersEntryGate, *workersExitLatch)); // this should be a blocking call
exitGate->countDown();
@@ -44,47 +44,68 @@ struct Fixture
BlockingThreadStackExecutor executor;
Gate workersEntryGate;
CountDownLatch workersExitLatch;
- Gate threadExitGate;
+ Gate blockedExecuteGate;
Fixture(uint32_t taskLimit, uint32_t tasksToWaitFor)
: executor(1, 128000, taskLimit),
workersEntryGate(),
workersExitLatch(tasksToWaitFor),
- threadExitGate()
+ blockedExecuteGate()
{}
void execute(size_t numTasks) {
for (size_t i = 0; i < numTasks; ++i) {
executor.execute(MyTask::create(workersEntryGate, workersExitLatch));
}
}
+ void updateTaskLimit(uint32_t taskLimit) {
+ executor.setTaskLimit(taskLimit);
+ }
void openForWorkers() {
workersEntryGate.countDown();
}
void waitForWorkers() {
- workersExitLatch.await(30000);
+ workersExitLatch.await(msWait);
+ }
+ void assertExecuteIsBlocked() {
+ blockedExecuteGate.await(10);
+ EXPECT_EQUAL(1u, blockedExecuteGate.getCount());
}
- void assertAddTaskThreadIsBlocked() {
- threadExitGate.await(10);
- EXPECT_EQUAL(1u, threadExitGate.getCount());
+ void waitForExecuteIsFinished() {
+ blockedExecuteGate.await(msWait);
+ EXPECT_EQUAL(0u, blockedExecuteGate.getCount());
}
- void assertAddTaskThreadIsFinished() {
- threadExitGate.await(30000);
- EXPECT_EQUAL(0u, threadExitGate.getCount());
+ ThreadUP blockedExecuteThread() {
+ return std::make_unique<std::thread>(blockedExecute, &executor, &workersEntryGate, &workersExitLatch, &blockedExecuteGate);
}
- ThreadUP createAddTaskThread() {
- return std::make_unique<std::thread>(addTaskToExecutor, &executor, &workersEntryGate, &workersExitLatch, &threadExitGate);
+ void blockedExecuteAndWaitUntilFinished() {
+ ThreadUP thread = blockedExecuteThread();
+ TEST_DO(assertExecuteIsBlocked());
+ openForWorkers();
+ TEST_DO(waitForExecuteIsFinished());
+ thread->join();
+ waitForWorkers();
}
};
TEST_F("require that execute() blocks when task limits is reached", Fixture(3, 4))
{
f.execute(3);
- ThreadUP thread = f.createAddTaskThread();
- TEST_DO(f.assertAddTaskThreadIsBlocked());
- f.openForWorkers();
- TEST_DO(f.assertAddTaskThreadIsFinished());
- thread->join();
- f.waitForWorkers();
+ f.blockedExecuteAndWaitUntilFinished();
+}
+
+TEST_F("require that task limit can be increased", Fixture(3, 5))
+{
+ f.execute(3);
+ f.updateTaskLimit(4);
+ f.execute(1);
+ f.blockedExecuteAndWaitUntilFinished();
+}
+
+TEST_F("require that task limit can be decreased", Fixture(3, 3))
+{
+ f.execute(2);
+ f.updateTaskLimit(2);
+ f.blockedExecuteAndWaitUntilFinished();
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp
index 4df04d15ce4..95d0b147707 100644
--- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp
+++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp
@@ -31,4 +31,10 @@ BlockingThreadStackExecutor::wakeup(MonitorGuard & monitor)
monitor.broadcast();
}
+void
+BlockingThreadStackExecutor::setTaskLimit(uint32_t taskLimit)
+{
+ ThreadStackExecutorBase::setTaskLimit(taskLimit);
+}
+
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h
index 5fba2c7dbe6..6e1b7ad8100 100644
--- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h
+++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h
@@ -27,6 +27,11 @@ public:
**/
BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit);
~BlockingThreadStackExecutor();
+
+ /**
+ * Sets a new upper limit for accepted number of tasks.
+ */
+ void setTaskLimit(uint32_t taskLimit);
};
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
index 780f0ed5dd0..816c1feaf1b 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
@@ -129,6 +129,13 @@ ThreadStackExecutorBase::start(uint32_t threads)
}
}
+void
+ThreadStackExecutorBase::setTaskLimit(uint32_t taskLimit)
+{
+ MonitorGuard monitor(_monitor);
+ _taskLimit = taskLimit;
+}
+
ThreadStackExecutorBase::Stats
ThreadStackExecutorBase::getStats()
{
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
index 215a8377fc6..b5ab46ed335 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
@@ -139,6 +139,7 @@ protected:
* @param taskLimit upper limit on accepted tasks
**/
ThreadStackExecutorBase(uint32_t stackSize, uint32_t taskLimit);
+
/**
* This will start the theads. This is to avoid starting tasks in
* constructor of base class.
@@ -146,6 +147,12 @@ protected:
* @param threads number of worker threads (concurrent tasks)
*/
void start(uint32_t threads);
+
+ /**
+ * Sets a new upper limit for accepted number of tasks.
+ */
+ void setTaskLimit(uint32_t taskLimit);
+
public:
/**
* Observe and reset stats for this object.