diff options
author | Håvard Pettersen <havardpe@oath.com> | 2020-06-14 11:43:34 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@oath.com> | 2020-06-14 11:43:34 +0000 |
commit | bfc6cfee7ea0dc4b8746573ab6e18fc5941c59c2 (patch) | |
tree | ede34221ac36f440de4fb0ba600bfab1a8f9be19 /eval | |
parent | aacb857c63b9932ecac330d65bdd28d05d458b62 (diff) |
avoid deadlock when using compile cache with blocking executors
- avoid taking cache lock in compile task destructor
- avoid holding cache lock while posting compile task
- add unit test trying to provoke deadlock
- bonus: fix blocking executor implementation
(did not work with threads <= task limit)
Diffstat (limited to 'eval')
-rw-r--r-- | eval/src/tests/eval/compile_cache/compile_cache_test.cpp | 80 | ||||
-rw-r--r-- | eval/src/vespa/eval/eval/llvm/compile_cache.cpp | 37 | ||||
-rw-r--r-- | eval/src/vespa/eval/eval/llvm/compile_cache.h | 33 |
3 files changed, 98 insertions, 52 deletions
diff --git a/eval/src/tests/eval/compile_cache/compile_cache_test.cpp b/eval/src/tests/eval/compile_cache/compile_cache_test.cpp index 1de56e605c9..a0dad889d9a 100644 --- a/eval/src/tests/eval/compile_cache/compile_cache_test.cpp +++ b/eval/src/tests/eval/compile_cache/compile_cache_test.cpp @@ -5,12 +5,16 @@ #include <vespa/eval/eval/test/eval_spec.h> #include <vespa/vespalib/util/time.h> #include <vespa/vespalib/util/threadstackexecutor.h> +#include <vespa/vespalib/util/blockingthreadstackexecutor.h> +#include <vespa/vespalib/util/stringfmt.h> #include <thread> #include <set> using namespace vespalib; using namespace vespalib::eval; +using vespalib::make_string_short::fmt; + struct MyExecutor : public Executor { std::vector<Executor::Task::UP> tasks; Executor::Task::UP execute(Executor::Task::UP task) override { @@ -157,7 +161,7 @@ TEST("require that cache usage works") { } TEST("require that async cache usage works") { - ThreadStackExecutor executor(8, 256*1024); + auto executor = std::make_shared<ThreadStackExecutor>(8, 256*1024); auto binding = CompileCache::bind(executor); CompileCache::Token::UP token_a = CompileCache::compile(*Function::parse("x+y"), PassParams::SEPARATE); EXPECT_EQUAL(5.0, token_a->get().get_function<2>()(2.0, 3.0)); @@ -166,7 +170,6 @@ TEST("require that async cache usage works") { CompileCache::Token::UP token_c = CompileCache::compile(*Function::parse("x+y"), PassParams::SEPARATE); EXPECT_EQUAL(5.0, token_c->get().get_function<2>()(2.0, 3.0)); EXPECT_EQUAL(CompileCache::num_cached(), 2u); - executor.sync(); // wait for compile threads to drop all compile cache tokens token_a.reset(); TEST_DO(verify_cache(2, 2)); token_b.reset(); @@ -176,24 +179,24 @@ TEST("require that async cache usage works") { } TEST("require that compile tasks are run in the most recently bound executor") { - MyExecutor exe1; - MyExecutor exe2; + auto exe1 = std::make_shared<MyExecutor>(); + auto exe2 = std::make_shared<MyExecutor>(); auto token0 = CompileCache::compile(*Function::parse("a+b"), PassParams::SEPARATE); EXPECT_EQUAL(CompileCache::num_bound(), 0u); - EXPECT_EQUAL(exe1.tasks.size(), 0u); - EXPECT_EQUAL(exe2.tasks.size(), 0u); + EXPECT_EQUAL(exe1->tasks.size(), 0u); + EXPECT_EQUAL(exe2->tasks.size(), 0u); { auto bind1 = CompileCache::bind(exe1); auto token1 = CompileCache::compile(*Function::parse("a-b"), PassParams::SEPARATE); EXPECT_EQUAL(CompileCache::num_bound(), 1u); - EXPECT_EQUAL(exe1.tasks.size(), 1u); - EXPECT_EQUAL(exe2.tasks.size(), 0u); + EXPECT_EQUAL(exe1->tasks.size(), 1u); + EXPECT_EQUAL(exe2->tasks.size(), 0u); { auto bind2 = CompileCache::bind(exe2); auto token2 = CompileCache::compile(*Function::parse("a*b"), PassParams::SEPARATE); EXPECT_EQUAL(CompileCache::num_bound(), 2u); - EXPECT_EQUAL(exe1.tasks.size(), 1u); - EXPECT_EQUAL(exe2.tasks.size(), 1u); + EXPECT_EQUAL(exe1->tasks.size(), 1u); + EXPECT_EQUAL(exe2->tasks.size(), 1u); } EXPECT_EQUAL(CompileCache::num_bound(), 1u); } @@ -201,9 +204,9 @@ TEST("require that compile tasks are run in the most recently bound executor") { } TEST("require that executors may be unbound in any order") { - MyExecutor exe1; - MyExecutor exe2; - MyExecutor exe3; + auto exe1 = std::make_shared<MyExecutor>(); + auto exe2 = std::make_shared<MyExecutor>(); + auto exe3 = std::make_shared<MyExecutor>(); auto bind1 = CompileCache::bind(exe1); auto bind2 = CompileCache::bind(exe2); auto bind3 = CompileCache::bind(exe3); @@ -213,13 +216,13 @@ TEST("require that executors may be unbound in any order") { bind3.reset(); EXPECT_EQUAL(CompileCache::num_bound(), 1u); auto token = CompileCache::compile(*Function::parse("a+b"), PassParams::SEPARATE); - EXPECT_EQUAL(exe1.tasks.size(), 1u); - EXPECT_EQUAL(exe2.tasks.size(), 0u); - EXPECT_EQUAL(exe3.tasks.size(), 0u); + EXPECT_EQUAL(exe1->tasks.size(), 1u); + EXPECT_EQUAL(exe2->tasks.size(), 0u); + EXPECT_EQUAL(exe3->tasks.size(), 0u); } TEST("require that the same executor can be bound multiple times") { - MyExecutor exe1; + auto exe1 = std::make_shared<MyExecutor>(); auto bind1 = CompileCache::bind(exe1); auto bind2 = CompileCache::bind(exe1); auto bind3 = CompileCache::bind(exe1); @@ -230,7 +233,7 @@ TEST("require that the same executor can be bound multiple times") { EXPECT_EQUAL(CompileCache::num_bound(), 1u); auto token = CompileCache::compile(*Function::parse("a+b"), PassParams::SEPARATE); EXPECT_EQUAL(CompileCache::num_bound(), 1u); - EXPECT_EQUAL(exe1.tasks.size(), 1u); + EXPECT_EQUAL(exe1->tasks.size(), 1u); } struct CompileCheck : test::EvalSpec::EvalTest { @@ -286,9 +289,9 @@ TEST_F("compile sequentially, then run all conformance tests", test::EvalSpec()) TEST_F("compile concurrently (8 threads), then run all conformance tests", test::EvalSpec()) { f1.add_all_cases(); - ThreadStackExecutor executor(8, 256*1024); + auto executor = std::make_shared<ThreadStackExecutor>(8, 256*1024); auto binding = CompileCache::bind(executor); - while (executor.num_idle_workers() < 8) { + while (executor->num_idle_workers() < 8) { std::this_thread::sleep_for(1ms); } for (size_t i = 0; i < 2; ++i) { @@ -305,6 +308,43 @@ TEST_F("compile concurrently (8 threads), then run all conformance tests", test: } } +struct MyCompileTask : public Executor::Task { + size_t seed; + size_t loop; + MyCompileTask(size_t seed_in, size_t loop_in) : seed(seed_in), loop(loop_in) {} + void run() override { + for (size_t i = 0; i < loop; ++i) { + // use custom constant to make a unique function that needs compilation + auto token = CompileCache::compile(*Function::parse(fmt("%zu", seed + i)), PassParams::SEPARATE); + } + } +}; + +TEST_MT_FF("require that deadlock is avoided with blocking executor", 8, std::shared_ptr<Executor>(nullptr), TimeBomb(300)) { + size_t loop = 16; + if (thread_id == 0) { + auto t0 = steady_clock::now(); + f1 = std::make_shared<BlockingThreadStackExecutor>(2, 256*1024, 3); + auto binding = CompileCache::bind(f1); + TEST_BARRIER(); // #1 + for (size_t i = 0; i < num_threads; ++i) { + f1->execute(std::make_unique<MyCompileTask>(i * loop, loop)); + } + TEST_BARRIER(); // #2 + auto t1 = steady_clock::now(); + fprintf(stderr, "deadlock test took %" PRIu64 " ms\n", count_ms(t1 - t0)); + + } else { + TEST_BARRIER(); // #1 + size_t seed = (10000 + (thread_id * loop)); + for (size_t i = 0; i < loop; ++i) { + // use custom constant to make a unique function that needs compilation + auto token = CompileCache::compile(*Function::parse(fmt("%zu", seed + i)), PassParams::SEPARATE); + } + TEST_BARRIER(); // #2 + } +} + //----------------------------------------------------------------------------- TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/eval/src/vespa/eval/eval/llvm/compile_cache.cpp b/eval/src/vespa/eval/eval/llvm/compile_cache.cpp index 41106651024..e2674a6e4d6 100644 --- a/eval/src/vespa/eval/eval/llvm/compile_cache.cpp +++ b/eval/src/vespa/eval/eval/llvm/compile_cache.cpp @@ -10,14 +10,14 @@ namespace eval { std::mutex CompileCache::_lock{}; CompileCache::Map CompileCache::_cached{}; uint64_t CompileCache::_executor_tag{0}; -std::vector<std::pair<uint64_t,Executor*>> CompileCache::_executor_stack{}; +std::vector<std::pair<uint64_t,std::shared_ptr<Executor>>> CompileCache::_executor_stack{}; const CompiledFunction & CompileCache::Value::wait_for_result() { - std::unique_lock<std::mutex> guard(result_lock); - cond.wait(guard, [this](){ return bool(compiled_function); }); - return *compiled_function; + std::unique_lock<std::mutex> guard(result->lock); + result->cond.wait(guard, [this](){ return bool(result->compiled_function); }); + return *(result->compiled_function); } void @@ -30,10 +30,10 @@ CompileCache::release(Map::iterator entry) } uint64_t -CompileCache::attach_executor(Executor &executor) +CompileCache::attach_executor(std::shared_ptr<Executor> executor) { std::lock_guard<std::mutex> guard(_lock); - _executor_stack.emplace_back(++_executor_tag, &executor); + _executor_stack.emplace_back(++_executor_tag, std::move(executor)); return _executor_tag; } @@ -52,6 +52,7 @@ CompileCache::compile(const Function &function, PassParams pass_params) { Token::UP token; Executor::Task::UP task; + std::shared_ptr<Executor> executor; vespalib::string key = gen_key(function, pass_params); { std::lock_guard<std::mutex> guard(_lock); @@ -63,14 +64,15 @@ CompileCache::compile(const Function &function, PassParams pass_params) auto res = _cached.emplace(std::move(key), Value::ctor_tag()); assert(res.second); token = std::make_unique<Token>(res.first, Token::ctor_tag()); - ++(res.first->second.num_refs); - task = std::make_unique<CompileTask>(function, pass_params, - std::make_unique<Token>(res.first, Token::ctor_tag())); + task = std::make_unique<CompileTask>(function, pass_params, res.first->second.result); if (!_executor_stack.empty()) { - task = _executor_stack.back().second->execute(std::move(task)); + executor = _executor_stack.back().second; } } } + if (executor) { + task = executor->execute(std::move(task)); + } if (task) { std::thread([&task](){ task.get()->run(); }).join(); } @@ -84,7 +86,7 @@ CompileCache::wait_pending() { std::lock_guard<std::mutex> guard(_lock); for (auto entry = _cached.begin(); entry != _cached.end(); ++entry) { - if (entry->second.cf.load(std::memory_order_acquire) == nullptr) { + if (entry->second.result->cf.load(std::memory_order_acquire) == nullptr) { ++(entry->second.num_refs); pending.push_back(std::make_unique<Token>(entry, Token::ctor_tag())); } @@ -129,7 +131,7 @@ CompileCache::count_pending() std::lock_guard<std::mutex> guard(_lock); size_t pending = 0; for (const auto &entry: _cached) { - if (entry.second.cf.load(std::memory_order_acquire) == nullptr) { + if (entry.second.result->cf.load(std::memory_order_acquire) == nullptr) { ++pending; } } @@ -139,12 +141,11 @@ CompileCache::count_pending() void CompileCache::CompileTask::run() { - auto &entry = token->_entry->second; - auto result = std::make_unique<CompiledFunction>(*function, pass_params); - std::lock_guard<std::mutex> guard(entry.result_lock); - entry.compiled_function = std::move(result); - entry.cf.store(entry.compiled_function.get(), std::memory_order_release); - entry.cond.notify_all(); + auto compiled = std::make_unique<CompiledFunction>(*function, pass_params); + std::lock_guard<std::mutex> guard(result->lock); + result->compiled_function = std::move(compiled); + result->cf.store(result->compiled_function.get(), std::memory_order_release); + result->cond.notify_all(); } } // namespace vespalib::eval diff --git a/eval/src/vespa/eval/eval/llvm/compile_cache.h b/eval/src/vespa/eval/eval/llvm/compile_cache.h index e8f87e454d3..61d0cc83d94 100644 --- a/eval/src/vespa/eval/eval/llvm/compile_cache.h +++ b/eval/src/vespa/eval/eval/llvm/compile_cache.h @@ -23,17 +23,22 @@ class CompileCache { private: using Key = vespalib::string; - struct Value { - size_t num_refs; + struct Result { + using SP = std::shared_ptr<Result>; std::atomic<const CompiledFunction *> cf; - std::mutex result_lock; + std::mutex lock; std::condition_variable cond; CompiledFunction::UP compiled_function; + Result() : cf(nullptr), lock(), cond(), compiled_function(nullptr) {} + }; + struct Value { + size_t num_refs; + Result::SP result; struct ctor_tag {}; - Value(ctor_tag) : num_refs(1), cf(nullptr), result_lock(), cond(), compiled_function() {} + Value(ctor_tag) : num_refs(1), result(std::make_shared<Result>()) {} const CompiledFunction &wait_for_result(); const CompiledFunction &get() { - const CompiledFunction *ptr = cf.load(std::memory_order_acquire); + const CompiledFunction *ptr = result->cf.load(std::memory_order_acquire); if (ptr == nullptr) { return wait_for_result(); } @@ -44,10 +49,10 @@ private: static std::mutex _lock; static Map _cached; static uint64_t _executor_tag; - static std::vector<std::pair<uint64_t,Executor*>> _executor_stack; + static std::vector<std::pair<uint64_t,std::shared_ptr<Executor>>> _executor_stack; static void release(Map::iterator entry); - static uint64_t attach_executor(Executor &executor); + static uint64_t attach_executor(std::shared_ptr<Executor> executor); static void detach_executor(uint64_t tag); public: @@ -55,7 +60,6 @@ public: { private: friend class CompileCache; - friend class CompileTask; struct ctor_tag {}; CompileCache::Map::iterator _entry; public: @@ -80,14 +84,15 @@ public: ExecutorBinding &operator=(ExecutorBinding &&) = delete; ExecutorBinding &operator=(const ExecutorBinding &) = delete; using UP = std::unique_ptr<ExecutorBinding>; - explicit ExecutorBinding(Executor &executor, ctor_tag) : _tag(attach_executor(executor)) {} + explicit ExecutorBinding(std::shared_ptr<Executor> executor, ctor_tag) + : _tag(attach_executor(std::move(executor))) {} ~ExecutorBinding() { detach_executor(_tag); } }; static Token::UP compile(const Function &function, PassParams pass_params); static void wait_pending(); - static ExecutorBinding::UP bind(Executor &executor) { - return std::make_unique<ExecutorBinding>(executor, ExecutorBinding::ctor_tag()); + static ExecutorBinding::UP bind(std::shared_ptr<Executor> executor) { + return std::make_unique<ExecutorBinding>(std::move(executor), ExecutorBinding::ctor_tag()); } static size_t num_cached(); static size_t num_bound(); @@ -98,9 +103,9 @@ private: struct CompileTask : public Executor::Task { std::shared_ptr<Function const> function; PassParams pass_params; - Token::UP token; - CompileTask(const Function &function_in, PassParams pass_params_in, Token::UP token_in) - : function(function_in.shared_from_this()), pass_params(pass_params_in), token(std::move(token_in)) {} + Result::SP result; + CompileTask(const Function &function_in, PassParams pass_params_in, Result::SP result_in) + : function(function_in.shared_from_this()), pass_params(pass_params_in), result(std::move(result_in)) {} void run() override; }; }; |