diff options
author | Håvard Pettersen <havardpe@oath.com> | 2019-02-12 13:35:54 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@oath.com> | 2019-02-12 13:35:54 +0000 |
commit | 32cffa643ead82167e65832e5d8db1c3a0d05307 (patch) | |
tree | 0e7e1e50f53a0c9ae8d4a452abcaba047b078565 | |
parent | d0f1c4bedb471ac9d1d28f87719c6009321ddf26 (diff) |
do not block worker threads posting tasks on owning executor
4 files changed, 42 insertions, 1 deletions
diff --git a/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp b/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp index 4389ca810ac..c2a3e66c671 100644 --- a/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp +++ b/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp @@ -137,4 +137,33 @@ TEST_F("require that executor thread stack tag can be set", BlockingThreadStackE } } +TEST_F("require that tasks posted from internal worker thread will not block executor", TimeBomb(60)) { + size_t cnt = 0; + Gate fork_done; + BlockingThreadStackExecutor executor(1, 128*1024, 10); + struct IncTask : Executor::Task { + size_t &cnt; + IncTask(size_t &cnt_in) : cnt(cnt_in) {} + void run() override { ++cnt; } + }; + struct ForkTask : Executor::Task { + Executor &executor; + Gate &fork_done; + size_t &cnt; + ForkTask(Executor &executor_in, Gate &fork_done_in, size_t &cnt_in) + : executor(executor_in), fork_done(fork_done_in), cnt(cnt_in) {} + void run() override { + for (size_t i = 0; i < 32; ++i) { + executor.execute(std::make_unique<IncTask>(cnt)); + } + fork_done.countDown(); + } + }; + // post 32 internal tasks on a blocking executor with tasklimit 10 + executor.execute(std::make_unique<ForkTask>(executor, fork_done, cnt)); + fork_done.await(); + executor.sync(); + EXPECT_EQUAL(cnt, 32u); +} + TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp index 2570a89da45..33d6e46a244 100644 --- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp +++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp @@ -9,7 +9,7 @@ VESPA_THREAD_STACK_TAG(unnamed_blocking_executor); bool BlockingThreadStackExecutor::acceptNewTask(MonitorGuard & guard) { - while (!closed() && !isRoomForNewTask()) { + while (!closed() && !isRoomForNewTask() && !owns_this_thread()) { guard.wait(); } return (!closed()); diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index ea657d4ec76..dfd835e0e8e 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -43,6 +43,10 @@ ThreadStackExecutorBase::BlockedThread::unblock() //----------------------------------------------------------------------------- +thread_local ThreadStackExecutorBase *ThreadStackExecutorBase::_master = nullptr; + +//----------------------------------------------------------------------------- + void ThreadStackExecutorBase::block_thread(const LockGuard &, BlockedThread &blocked_thread) { @@ -114,6 +118,7 @@ void ThreadStackExecutorBase::run() { Worker worker; + _master = this; worker.verify(/* idle: */ true); while (obtainTask(worker)) { worker.verify(/* idle: */ false); @@ -122,6 +127,7 @@ ThreadStackExecutorBase::run() } _executorCompletion.await(); // to allow unsafe signaling worker.verify(/* idle: */ true); + _master = nullptr; } //----------------------------------------------------------------------------- diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index e0effb1b52e..8ee08ed3929 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -106,6 +106,7 @@ private: uint32_t _taskLimit; bool _closed; std::unique_ptr<thread::ThreadInit> _thread_init; + static thread_local ThreadStackExecutorBase *_master; void block_thread(const LockGuard &, BlockedThread &blocked_thread); void unblock_threads(const MonitorGuard &); @@ -185,6 +186,11 @@ protected: void start(uint32_t threads); /** + * Returns true if the current thread is owned by this executor. + **/ + bool owns_this_thread() const { return (_master == this); } + + /** * Sets a new upper limit for accepted number of tasks. */ void internalSetTaskLimit(uint32_t taskLimit); |