diff options
author | Haavard <havardpe@yahoo-inc.com> | 2016-06-22 12:05:14 +0000 |
---|---|---|
committer | Haavard <havardpe@yahoo-inc.com> | 2016-06-22 12:09:36 +0000 |
commit | e708a6f0629eba47a76e722377ddfc305e0b5900 (patch) | |
tree | 7a901b7410b3c36079257058b3b6f9bc213c5763 /vespalib | |
parent | 4b1530f59452e5896a11ac390202dc0f38bbfb1f (diff) |
enable waiting until there are few enough active tasks
Enables task throttling without making the scheduler itself blocking
Diffstat (limited to 'vespalib')
3 files changed, 119 insertions, 0 deletions
diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp index 0eb08a80f41..5deab82b6fc 100644 --- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp +++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp @@ -116,4 +116,45 @@ TEST_F("requireThatNewTasksAreDroppedAfterShutdown", MyState()) { TEST_DO(f1.open().shutdown().execute(5).sync().check(5, 0, 0, 0)); } + +struct WaitTask : public Executor::Task { + Gate &gate; + WaitTask(Gate &g) : gate(g) {} + virtual void run() { gate.await(); } +}; + +struct WaitState { + ThreadStackExecutor executor; + std::vector<Gate> block_task; + std::vector<Gate> wait_done; + WaitState(size_t num_threads) + : executor(num_threads / 2, 128000), block_task(num_threads - 2), wait_done(num_threads - 1) + { + for (auto &gate: block_task) { + auto result = executor.execute(std::make_unique<WaitTask>(gate)); + ASSERT_TRUE(result.get() == nullptr); + } + } + void wait(size_t count) { + executor.wait_for_task_count(count); + wait_done[count].countDown(); + } +}; + +TEST_MT_F("require that threads can wait for a specific task count", 7, WaitState(num_threads)) { + if (thread_id == 0) { + for (size_t next_done = (num_threads - 2); next_done-- > 0;) { + if (next_done < f1.block_task.size()) { + f1.block_task[f1.block_task.size() - 1 - next_done].countDown(); + } + EXPECT_TRUE(f1.wait_done[next_done].await(25000)); + for (size_t i = 0; i < next_done; ++i) { + EXPECT_TRUE(!f1.wait_done[i].await(20)); + } + } + } else { + f1.wait(thread_id - 1); + } +} + TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index bee1d9c17ec..780f0ed5dd0 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -6,6 +6,47 @@ namespace vespalib { void +ThreadStackExecutorBase::BlockedThread::wait() const +{ + MonitorGuard guard(monitor); + while (blocked) { + guard.wait(); + } +} + +void +ThreadStackExecutorBase::BlockedThread::unblock() +{ + MonitorGuard guard(monitor); + blocked = false; + guard.signal(); +} + +//----------------------------------------------------------------------------- + +void +ThreadStackExecutorBase::block_thread(const LockGuard &, BlockedThread &blocked_thread) +{ + auto pos = _blocked.begin(); + while ((pos != _blocked.end()) && + ((*pos)->wait_task_count < blocked_thread.wait_task_count)) + { + ++pos; + } + _blocked.insert(pos, &blocked_thread); +} + +void +ThreadStackExecutorBase::unblock_threads(const MonitorGuard &) +{ + while (!_blocked.empty() && (_taskCount <= _blocked.back()->wait_task_count)) { + BlockedThread &blocked_thread = *(_blocked.back()); + _blocked.pop_back(); + blocked_thread.unblock(); + } +} + +void ThreadStackExecutorBase::assignTask(const TaggedTask &task, Worker &worker) { MonitorGuard monitor(worker.monitor); @@ -26,6 +67,7 @@ ThreadStackExecutorBase::obtainTask(Worker &worker) _barrier.completeEvent(worker.task.token); worker.task.task = 0; } + unblock_threads(monitor); if (!_tasks.empty()) { worker.task = _tasks.front(); _tasks.pop(); @@ -168,6 +210,19 @@ ThreadStackExecutorBase::sync() } void +ThreadStackExecutorBase::wait_for_task_count(uint32_t task_count) +{ + LockGuard lock(_monitor); + if (_taskCount <= task_count) { + return; + } + BlockedThread self(task_count); + block_thread(lock, self); + lock.unlock(); // <- UNLOCK + self.wait(); +} + +void ThreadStackExecutorBase::cleanup() { shutdown().sync(); @@ -179,6 +234,7 @@ ThreadStackExecutorBase::~ThreadStackExecutorBase() { assert(_pool.isClosed()); assert(_taskCount == 0); + assert(_blocked.empty()); } } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index f66cd74dae1..215a8377fc6 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -52,17 +52,31 @@ private: void completeBarrier() { gate.countDown(); } }; + struct BlockedThread { + const uint32_t wait_task_count; + Monitor monitor; + bool blocked; + BlockedThread(uint32_t wait_task_count_in) + : wait_task_count(wait_task_count_in), monitor(), blocked(true) {} + void wait() const; + void unblock(); + }; + FastOS_ThreadPool _pool; Monitor _monitor; Stats _stats; Gate _executorCompletion; ArrayQueue<TaggedTask> _tasks; ArrayQueue<Worker*> _workers; + std::vector<BlockedThread*> _blocked; EventBarrier<BarrierCompletion> _barrier; uint32_t _taskCount; uint32_t _taskLimit; bool _closed; + void block_thread(const LockGuard &, BlockedThread &blocked_thread); + void unblock_threads(const MonitorGuard &); + /** * Assign the given task to the given idle worker. This will wake * up a worker thread that is blocked in the obtainTask function. @@ -153,6 +167,14 @@ public: virtual ThreadStackExecutorBase &sync(); /** + * Block the calling thread until the current task count is equal + * to or lower than the given value. + * + * @param task_count target value to wait for + **/ + void wait_for_task_count(uint32_t task_count); + + /** * Shut down this executor. This will make this executor reject * all new tasks. * |