summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-02-12 15:13:20 +0100
committerGitHub <noreply@github.com>2019-02-12 15:13:20 +0100
commit3bb08e98cf8e0d9a37bb11dbedbe5741355d72d0 (patch)
treefae779adae40aa3ebba056ae3bbd32b05add58e1
parentba4dffd8c1f3ec5586fc7bf8699ccbc6e70ba453 (diff)
parent32cffa643ead82167e65832e5d8db1c3a0d05307 (diff)
Merge pull request #8474 from vespa-engine/havardpe/avoid-self-deadlock-in-blocking-executor
do not block worker threads posting tasks on owning executor
-rw-r--r--vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp29
-rw-r--r--vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp2
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp6
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h6
4 files changed, 42 insertions, 1 deletions
diff --git a/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp b/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp
index 4389ca810ac..c2a3e66c671 100644
--- a/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp
+++ b/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp
@@ -137,4 +137,33 @@ TEST_F("require that executor thread stack tag can be set", BlockingThreadStackE
}
}
+TEST_F("require that tasks posted from internal worker thread will not block executor", TimeBomb(60)) {
+ size_t cnt = 0;
+ Gate fork_done;
+ BlockingThreadStackExecutor executor(1, 128*1024, 10);
+ struct IncTask : Executor::Task {
+ size_t &cnt;
+ IncTask(size_t &cnt_in) : cnt(cnt_in) {}
+ void run() override { ++cnt; }
+ };
+ struct ForkTask : Executor::Task {
+ Executor &executor;
+ Gate &fork_done;
+ size_t &cnt;
+ ForkTask(Executor &executor_in, Gate &fork_done_in, size_t &cnt_in)
+ : executor(executor_in), fork_done(fork_done_in), cnt(cnt_in) {}
+ void run() override {
+ for (size_t i = 0; i < 32; ++i) {
+ executor.execute(std::make_unique<IncTask>(cnt));
+ }
+ fork_done.countDown();
+ }
+ };
+ // post 32 internal tasks on a blocking executor with tasklimit 10
+ executor.execute(std::make_unique<ForkTask>(executor, fork_done, cnt));
+ fork_done.await();
+ executor.sync();
+ EXPECT_EQUAL(cnt, 32u);
+}
+
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp
index 2570a89da45..33d6e46a244 100644
--- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp
+++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp
@@ -9,7 +9,7 @@ VESPA_THREAD_STACK_TAG(unnamed_blocking_executor);
bool
BlockingThreadStackExecutor::acceptNewTask(MonitorGuard & guard)
{
- while (!closed() && !isRoomForNewTask()) {
+ while (!closed() && !isRoomForNewTask() && !owns_this_thread()) {
guard.wait();
}
return (!closed());
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
index ea657d4ec76..dfd835e0e8e 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
@@ -43,6 +43,10 @@ ThreadStackExecutorBase::BlockedThread::unblock()
//-----------------------------------------------------------------------------
+thread_local ThreadStackExecutorBase *ThreadStackExecutorBase::_master = nullptr;
+
+//-----------------------------------------------------------------------------
+
void
ThreadStackExecutorBase::block_thread(const LockGuard &, BlockedThread &blocked_thread)
{
@@ -114,6 +118,7 @@ void
ThreadStackExecutorBase::run()
{
Worker worker;
+ _master = this;
worker.verify(/* idle: */ true);
while (obtainTask(worker)) {
worker.verify(/* idle: */ false);
@@ -122,6 +127,7 @@ ThreadStackExecutorBase::run()
}
_executorCompletion.await(); // to allow unsafe signaling
worker.verify(/* idle: */ true);
+ _master = nullptr;
}
//-----------------------------------------------------------------------------
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
index e0effb1b52e..8ee08ed3929 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
@@ -106,6 +106,7 @@ private:
uint32_t _taskLimit;
bool _closed;
std::unique_ptr<thread::ThreadInit> _thread_init;
+ static thread_local ThreadStackExecutorBase *_master;
void block_thread(const LockGuard &, BlockedThread &blocked_thread);
void unblock_threads(const MonitorGuard &);
@@ -185,6 +186,11 @@ protected:
void start(uint32_t threads);
/**
+ * Returns true if the current thread is owned by this executor.
+ **/
+ bool owns_this_thread() const { return (_master == this); }
+
+ /**
* Sets a new upper limit for accepted number of tasks.
*/
void internalSetTaskLimit(uint32_t taskLimit);