summaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorHaavard <havardpe@yahoo-inc.com>2016-06-22 12:05:14 +0000
committerHaavard <havardpe@yahoo-inc.com>2016-06-22 12:09:36 +0000
commite708a6f0629eba47a76e722377ddfc305e0b5900 (patch)
tree7a901b7410b3c36079257058b3b6f9bc213c5763 /vespalib
parent4b1530f59452e5896a11ac390202dc0f38bbfb1f (diff)
enable waiting until there are few enough active tasks
Enables task throttling without making the scheduler itself blocking
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/tests/executor/threadstackexecutor_test.cpp41
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp56
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h22
3 files changed, 119 insertions, 0 deletions
diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
index 0eb08a80f41..5deab82b6fc 100644
--- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp
+++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
@@ -116,4 +116,45 @@ TEST_F("requireThatNewTasksAreDroppedAfterShutdown", MyState()) {
TEST_DO(f1.open().shutdown().execute(5).sync().check(5, 0, 0, 0));
}
+
+struct WaitTask : public Executor::Task {
+ Gate &gate;
+ WaitTask(Gate &g) : gate(g) {}
+ virtual void run() { gate.await(); }
+};
+
+struct WaitState {
+ ThreadStackExecutor executor;
+ std::vector<Gate> block_task;
+ std::vector<Gate> wait_done;
+ WaitState(size_t num_threads)
+ : executor(num_threads / 2, 128000), block_task(num_threads - 2), wait_done(num_threads - 1)
+ {
+ for (auto &gate: block_task) {
+ auto result = executor.execute(std::make_unique<WaitTask>(gate));
+ ASSERT_TRUE(result.get() == nullptr);
+ }
+ }
+ void wait(size_t count) {
+ executor.wait_for_task_count(count);
+ wait_done[count].countDown();
+ }
+};
+
+TEST_MT_F("require that threads can wait for a specific task count", 7, WaitState(num_threads)) {
+ if (thread_id == 0) {
+ for (size_t next_done = (num_threads - 2); next_done-- > 0;) {
+ if (next_done < f1.block_task.size()) {
+ f1.block_task[f1.block_task.size() - 1 - next_done].countDown();
+ }
+ EXPECT_TRUE(f1.wait_done[next_done].await(25000));
+ for (size_t i = 0; i < next_done; ++i) {
+ EXPECT_TRUE(!f1.wait_done[i].await(20));
+ }
+ }
+ } else {
+ f1.wait(thread_id - 1);
+ }
+}
+
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
index bee1d9c17ec..780f0ed5dd0 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
@@ -6,6 +6,47 @@
namespace vespalib {
void
+ThreadStackExecutorBase::BlockedThread::wait() const
+{
+ MonitorGuard guard(monitor);
+ while (blocked) {
+ guard.wait();
+ }
+}
+
+void
+ThreadStackExecutorBase::BlockedThread::unblock()
+{
+ MonitorGuard guard(monitor);
+ blocked = false;
+ guard.signal();
+}
+
+//-----------------------------------------------------------------------------
+
+void
+ThreadStackExecutorBase::block_thread(const LockGuard &, BlockedThread &blocked_thread)
+{
+ auto pos = _blocked.begin();
+ while ((pos != _blocked.end()) &&
+ ((*pos)->wait_task_count < blocked_thread.wait_task_count))
+ {
+ ++pos;
+ }
+ _blocked.insert(pos, &blocked_thread);
+}
+
+void
+ThreadStackExecutorBase::unblock_threads(const MonitorGuard &)
+{
+ while (!_blocked.empty() && (_taskCount <= _blocked.back()->wait_task_count)) {
+ BlockedThread &blocked_thread = *(_blocked.back());
+ _blocked.pop_back();
+ blocked_thread.unblock();
+ }
+}
+
+void
ThreadStackExecutorBase::assignTask(const TaggedTask &task, Worker &worker)
{
MonitorGuard monitor(worker.monitor);
@@ -26,6 +67,7 @@ ThreadStackExecutorBase::obtainTask(Worker &worker)
_barrier.completeEvent(worker.task.token);
worker.task.task = 0;
}
+ unblock_threads(monitor);
if (!_tasks.empty()) {
worker.task = _tasks.front();
_tasks.pop();
@@ -168,6 +210,19 @@ ThreadStackExecutorBase::sync()
}
void
+ThreadStackExecutorBase::wait_for_task_count(uint32_t task_count)
+{
+ LockGuard lock(_monitor);
+ if (_taskCount <= task_count) {
+ return;
+ }
+ BlockedThread self(task_count);
+ block_thread(lock, self);
+ lock.unlock(); // <- UNLOCK
+ self.wait();
+}
+
+void
ThreadStackExecutorBase::cleanup()
{
shutdown().sync();
@@ -179,6 +234,7 @@ ThreadStackExecutorBase::~ThreadStackExecutorBase()
{
assert(_pool.isClosed());
assert(_taskCount == 0);
+ assert(_blocked.empty());
}
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
index f66cd74dae1..215a8377fc6 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
@@ -52,17 +52,31 @@ private:
void completeBarrier() { gate.countDown(); }
};
+ struct BlockedThread {
+ const uint32_t wait_task_count;
+ Monitor monitor;
+ bool blocked;
+ BlockedThread(uint32_t wait_task_count_in)
+ : wait_task_count(wait_task_count_in), monitor(), blocked(true) {}
+ void wait() const;
+ void unblock();
+ };
+
FastOS_ThreadPool _pool;
Monitor _monitor;
Stats _stats;
Gate _executorCompletion;
ArrayQueue<TaggedTask> _tasks;
ArrayQueue<Worker*> _workers;
+ std::vector<BlockedThread*> _blocked;
EventBarrier<BarrierCompletion> _barrier;
uint32_t _taskCount;
uint32_t _taskLimit;
bool _closed;
+ void block_thread(const LockGuard &, BlockedThread &blocked_thread);
+ void unblock_threads(const MonitorGuard &);
+
/**
* Assign the given task to the given idle worker. This will wake
* up a worker thread that is blocked in the obtainTask function.
@@ -153,6 +167,14 @@ public:
virtual ThreadStackExecutorBase &sync();
/**
+ * Block the calling thread until the current task count is equal
+ * to or lower than the given value.
+ *
+ * @param task_count target value to wait for
+ **/
+ void wait_for_task_count(uint32_t task_count);
+
+ /**
* Shut down this executor. This will make this executor reject
* all new tasks.
*