From 32cffa643ead82167e65832e5d8db1c3a0d05307 Mon Sep 17 00:00:00 2001 From: HÃ¥vard Pettersen Date: Tue, 12 Feb 2019 13:35:54 +0000 Subject: do not block worker threads posting tasks on owning executor --- .../executor/blockingthreadstackexecutor_test.cpp | 29 ++++++++++++++++++++++ .../vespalib/util/blockingthreadstackexecutor.cpp | 2 +- .../vespalib/util/threadstackexecutorbase.cpp | 6 +++++ .../vespa/vespalib/util/threadstackexecutorbase.h | 6 +++++ 4 files changed, 42 insertions(+), 1 deletion(-) diff --git a/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp b/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp index 4389ca810ac..c2a3e66c671 100644 --- a/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp +++ b/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp @@ -137,4 +137,33 @@ TEST_F("require that executor thread stack tag can be set", BlockingThreadStackE } } +TEST_F("require that tasks posted from internal worker thread will not block executor", TimeBomb(60)) { + size_t cnt = 0; + Gate fork_done; + BlockingThreadStackExecutor executor(1, 128*1024, 10); + struct IncTask : Executor::Task { + size_t &cnt; + IncTask(size_t &cnt_in) : cnt(cnt_in) {} + void run() override { ++cnt; } + }; + struct ForkTask : Executor::Task { + Executor &executor; + Gate &fork_done; + size_t &cnt; + ForkTask(Executor &executor_in, Gate &fork_done_in, size_t &cnt_in) + : executor(executor_in), fork_done(fork_done_in), cnt(cnt_in) {} + void run() override { + for (size_t i = 0; i < 32; ++i) { + executor.execute(std::make_unique(cnt)); + } + fork_done.countDown(); + } + }; + // post 32 internal tasks on a blocking executor with tasklimit 10 + executor.execute(std::make_unique(executor, fork_done, cnt)); + fork_done.await(); + executor.sync(); + EXPECT_EQUAL(cnt, 32u); +} + TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp index 2570a89da45..33d6e46a244 100644 --- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp +++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp @@ -9,7 +9,7 @@ VESPA_THREAD_STACK_TAG(unnamed_blocking_executor); bool BlockingThreadStackExecutor::acceptNewTask(MonitorGuard & guard) { - while (!closed() && !isRoomForNewTask()) { + while (!closed() && !isRoomForNewTask() && !owns_this_thread()) { guard.wait(); } return (!closed()); diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index ea657d4ec76..dfd835e0e8e 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -43,6 +43,10 @@ ThreadStackExecutorBase::BlockedThread::unblock() //----------------------------------------------------------------------------- +thread_local ThreadStackExecutorBase *ThreadStackExecutorBase::_master = nullptr; + +//----------------------------------------------------------------------------- + void ThreadStackExecutorBase::block_thread(const LockGuard &, BlockedThread &blocked_thread) { @@ -114,6 +118,7 @@ void ThreadStackExecutorBase::run() { Worker worker; + _master = this; worker.verify(/* idle: */ true); while (obtainTask(worker)) { worker.verify(/* idle: */ false); @@ -122,6 +127,7 @@ ThreadStackExecutorBase::run() } _executorCompletion.await(); // to allow unsafe signaling worker.verify(/* idle: */ true); + _master = nullptr; } //----------------------------------------------------------------------------- diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index e0effb1b52e..8ee08ed3929 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -106,6 +106,7 @@ private: uint32_t _taskLimit; bool _closed; std::unique_ptr _thread_init; + static thread_local ThreadStackExecutorBase *_master; void block_thread(const LockGuard &, BlockedThread &blocked_thread); void unblock_threads(const MonitorGuard &); @@ -184,6 +185,11 @@ protected: */ void start(uint32_t threads); + /** + * Returns true if the current thread is owned by this executor. + **/ + bool owns_this_thread() const { return (_master == this); } + /** * Sets a new upper limit for accepted number of tasks. */ -- cgit v1.2.3