summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-06-14 17:36:17 +0200
committerGitHub <noreply@github.com>2020-06-14 17:36:17 +0200
commit0e4a1ad6ed04f8b51e6c79b4b82b597bcdc38b0c (patch)
treeede34221ac36f440de4fb0ba600bfab1a8f9be19
parentaacb857c63b9932ecac330d65bdd28d05d458b62 (diff)
parentbfc6cfee7ea0dc4b8746573ab6e18fc5941c59c2 (diff)
Merge pull request #13580 from vespa-engine/havardpe/fix-compile-cache-deadlock-with-blocking-executor
avoid deadlock when using compile cache with blocking executors
-rw-r--r--eval/src/tests/eval/compile_cache/compile_cache_test.cpp80
-rw-r--r--eval/src/vespa/eval/eval/llvm/compile_cache.cpp37
-rw-r--r--eval/src/vespa/eval/eval/llvm/compile_cache.h33
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.h2
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp2
6 files changed, 102 insertions, 56 deletions
diff --git a/eval/src/tests/eval/compile_cache/compile_cache_test.cpp b/eval/src/tests/eval/compile_cache/compile_cache_test.cpp
index 1de56e605c9..a0dad889d9a 100644
--- a/eval/src/tests/eval/compile_cache/compile_cache_test.cpp
+++ b/eval/src/tests/eval/compile_cache/compile_cache_test.cpp
@@ -5,12 +5,16 @@
#include <vespa/eval/eval/test/eval_spec.h>
#include <vespa/vespalib/util/time.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
+#include <vespa/vespalib/util/stringfmt.h>
#include <thread>
#include <set>
using namespace vespalib;
using namespace vespalib::eval;
+using vespalib::make_string_short::fmt;
+
struct MyExecutor : public Executor {
std::vector<Executor::Task::UP> tasks;
Executor::Task::UP execute(Executor::Task::UP task) override {
@@ -157,7 +161,7 @@ TEST("require that cache usage works") {
}
TEST("require that async cache usage works") {
- ThreadStackExecutor executor(8, 256*1024);
+ auto executor = std::make_shared<ThreadStackExecutor>(8, 256*1024);
auto binding = CompileCache::bind(executor);
CompileCache::Token::UP token_a = CompileCache::compile(*Function::parse("x+y"), PassParams::SEPARATE);
EXPECT_EQUAL(5.0, token_a->get().get_function<2>()(2.0, 3.0));
@@ -166,7 +170,6 @@ TEST("require that async cache usage works") {
CompileCache::Token::UP token_c = CompileCache::compile(*Function::parse("x+y"), PassParams::SEPARATE);
EXPECT_EQUAL(5.0, token_c->get().get_function<2>()(2.0, 3.0));
EXPECT_EQUAL(CompileCache::num_cached(), 2u);
- executor.sync(); // wait for compile threads to drop all compile cache tokens
token_a.reset();
TEST_DO(verify_cache(2, 2));
token_b.reset();
@@ -176,24 +179,24 @@ TEST("require that async cache usage works") {
}
TEST("require that compile tasks are run in the most recently bound executor") {
- MyExecutor exe1;
- MyExecutor exe2;
+ auto exe1 = std::make_shared<MyExecutor>();
+ auto exe2 = std::make_shared<MyExecutor>();
auto token0 = CompileCache::compile(*Function::parse("a+b"), PassParams::SEPARATE);
EXPECT_EQUAL(CompileCache::num_bound(), 0u);
- EXPECT_EQUAL(exe1.tasks.size(), 0u);
- EXPECT_EQUAL(exe2.tasks.size(), 0u);
+ EXPECT_EQUAL(exe1->tasks.size(), 0u);
+ EXPECT_EQUAL(exe2->tasks.size(), 0u);
{
auto bind1 = CompileCache::bind(exe1);
auto token1 = CompileCache::compile(*Function::parse("a-b"), PassParams::SEPARATE);
EXPECT_EQUAL(CompileCache::num_bound(), 1u);
- EXPECT_EQUAL(exe1.tasks.size(), 1u);
- EXPECT_EQUAL(exe2.tasks.size(), 0u);
+ EXPECT_EQUAL(exe1->tasks.size(), 1u);
+ EXPECT_EQUAL(exe2->tasks.size(), 0u);
{
auto bind2 = CompileCache::bind(exe2);
auto token2 = CompileCache::compile(*Function::parse("a*b"), PassParams::SEPARATE);
EXPECT_EQUAL(CompileCache::num_bound(), 2u);
- EXPECT_EQUAL(exe1.tasks.size(), 1u);
- EXPECT_EQUAL(exe2.tasks.size(), 1u);
+ EXPECT_EQUAL(exe1->tasks.size(), 1u);
+ EXPECT_EQUAL(exe2->tasks.size(), 1u);
}
EXPECT_EQUAL(CompileCache::num_bound(), 1u);
}
@@ -201,9 +204,9 @@ TEST("require that compile tasks are run in the most recently bound executor") {
}
TEST("require that executors may be unbound in any order") {
- MyExecutor exe1;
- MyExecutor exe2;
- MyExecutor exe3;
+ auto exe1 = std::make_shared<MyExecutor>();
+ auto exe2 = std::make_shared<MyExecutor>();
+ auto exe3 = std::make_shared<MyExecutor>();
auto bind1 = CompileCache::bind(exe1);
auto bind2 = CompileCache::bind(exe2);
auto bind3 = CompileCache::bind(exe3);
@@ -213,13 +216,13 @@ TEST("require that executors may be unbound in any order") {
bind3.reset();
EXPECT_EQUAL(CompileCache::num_bound(), 1u);
auto token = CompileCache::compile(*Function::parse("a+b"), PassParams::SEPARATE);
- EXPECT_EQUAL(exe1.tasks.size(), 1u);
- EXPECT_EQUAL(exe2.tasks.size(), 0u);
- EXPECT_EQUAL(exe3.tasks.size(), 0u);
+ EXPECT_EQUAL(exe1->tasks.size(), 1u);
+ EXPECT_EQUAL(exe2->tasks.size(), 0u);
+ EXPECT_EQUAL(exe3->tasks.size(), 0u);
}
TEST("require that the same executor can be bound multiple times") {
- MyExecutor exe1;
+ auto exe1 = std::make_shared<MyExecutor>();
auto bind1 = CompileCache::bind(exe1);
auto bind2 = CompileCache::bind(exe1);
auto bind3 = CompileCache::bind(exe1);
@@ -230,7 +233,7 @@ TEST("require that the same executor can be bound multiple times") {
EXPECT_EQUAL(CompileCache::num_bound(), 1u);
auto token = CompileCache::compile(*Function::parse("a+b"), PassParams::SEPARATE);
EXPECT_EQUAL(CompileCache::num_bound(), 1u);
- EXPECT_EQUAL(exe1.tasks.size(), 1u);
+ EXPECT_EQUAL(exe1->tasks.size(), 1u);
}
struct CompileCheck : test::EvalSpec::EvalTest {
@@ -286,9 +289,9 @@ TEST_F("compile sequentially, then run all conformance tests", test::EvalSpec())
TEST_F("compile concurrently (8 threads), then run all conformance tests", test::EvalSpec()) {
f1.add_all_cases();
- ThreadStackExecutor executor(8, 256*1024);
+ auto executor = std::make_shared<ThreadStackExecutor>(8, 256*1024);
auto binding = CompileCache::bind(executor);
- while (executor.num_idle_workers() < 8) {
+ while (executor->num_idle_workers() < 8) {
std::this_thread::sleep_for(1ms);
}
for (size_t i = 0; i < 2; ++i) {
@@ -305,6 +308,43 @@ TEST_F("compile concurrently (8 threads), then run all conformance tests", test:
}
}
+struct MyCompileTask : public Executor::Task {
+ size_t seed;
+ size_t loop;
+ MyCompileTask(size_t seed_in, size_t loop_in) : seed(seed_in), loop(loop_in) {}
+ void run() override {
+ for (size_t i = 0; i < loop; ++i) {
+ // use custom constant to make a unique function that needs compilation
+ auto token = CompileCache::compile(*Function::parse(fmt("%zu", seed + i)), PassParams::SEPARATE);
+ }
+ }
+};
+
+TEST_MT_FF("require that deadlock is avoided with blocking executor", 8, std::shared_ptr<Executor>(nullptr), TimeBomb(300)) {
+ size_t loop = 16;
+ if (thread_id == 0) {
+ auto t0 = steady_clock::now();
+ f1 = std::make_shared<BlockingThreadStackExecutor>(2, 256*1024, 3);
+ auto binding = CompileCache::bind(f1);
+ TEST_BARRIER(); // #1
+ for (size_t i = 0; i < num_threads; ++i) {
+ f1->execute(std::make_unique<MyCompileTask>(i * loop, loop));
+ }
+ TEST_BARRIER(); // #2
+ auto t1 = steady_clock::now();
+ fprintf(stderr, "deadlock test took %" PRIu64 " ms\n", count_ms(t1 - t0));
+
+ } else {
+ TEST_BARRIER(); // #1
+ size_t seed = (10000 + (thread_id * loop));
+ for (size_t i = 0; i < loop; ++i) {
+ // use custom constant to make a unique function that needs compilation
+ auto token = CompileCache::compile(*Function::parse(fmt("%zu", seed + i)), PassParams::SEPARATE);
+ }
+ TEST_BARRIER(); // #2
+ }
+}
+
//-----------------------------------------------------------------------------
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/eval/src/vespa/eval/eval/llvm/compile_cache.cpp b/eval/src/vespa/eval/eval/llvm/compile_cache.cpp
index 41106651024..e2674a6e4d6 100644
--- a/eval/src/vespa/eval/eval/llvm/compile_cache.cpp
+++ b/eval/src/vespa/eval/eval/llvm/compile_cache.cpp
@@ -10,14 +10,14 @@ namespace eval {
std::mutex CompileCache::_lock{};
CompileCache::Map CompileCache::_cached{};
uint64_t CompileCache::_executor_tag{0};
-std::vector<std::pair<uint64_t,Executor*>> CompileCache::_executor_stack{};
+std::vector<std::pair<uint64_t,std::shared_ptr<Executor>>> CompileCache::_executor_stack{};
const CompiledFunction &
CompileCache::Value::wait_for_result()
{
- std::unique_lock<std::mutex> guard(result_lock);
- cond.wait(guard, [this](){ return bool(compiled_function); });
- return *compiled_function;
+ std::unique_lock<std::mutex> guard(result->lock);
+ result->cond.wait(guard, [this](){ return bool(result->compiled_function); });
+ return *(result->compiled_function);
}
void
@@ -30,10 +30,10 @@ CompileCache::release(Map::iterator entry)
}
uint64_t
-CompileCache::attach_executor(Executor &executor)
+CompileCache::attach_executor(std::shared_ptr<Executor> executor)
{
std::lock_guard<std::mutex> guard(_lock);
- _executor_stack.emplace_back(++_executor_tag, &executor);
+ _executor_stack.emplace_back(++_executor_tag, std::move(executor));
return _executor_tag;
}
@@ -52,6 +52,7 @@ CompileCache::compile(const Function &function, PassParams pass_params)
{
Token::UP token;
Executor::Task::UP task;
+ std::shared_ptr<Executor> executor;
vespalib::string key = gen_key(function, pass_params);
{
std::lock_guard<std::mutex> guard(_lock);
@@ -63,14 +64,15 @@ CompileCache::compile(const Function &function, PassParams pass_params)
auto res = _cached.emplace(std::move(key), Value::ctor_tag());
assert(res.second);
token = std::make_unique<Token>(res.first, Token::ctor_tag());
- ++(res.first->second.num_refs);
- task = std::make_unique<CompileTask>(function, pass_params,
- std::make_unique<Token>(res.first, Token::ctor_tag()));
+ task = std::make_unique<CompileTask>(function, pass_params, res.first->second.result);
if (!_executor_stack.empty()) {
- task = _executor_stack.back().second->execute(std::move(task));
+ executor = _executor_stack.back().second;
}
}
}
+ if (executor) {
+ task = executor->execute(std::move(task));
+ }
if (task) {
std::thread([&task](){ task.get()->run(); }).join();
}
@@ -84,7 +86,7 @@ CompileCache::wait_pending()
{
std::lock_guard<std::mutex> guard(_lock);
for (auto entry = _cached.begin(); entry != _cached.end(); ++entry) {
- if (entry->second.cf.load(std::memory_order_acquire) == nullptr) {
+ if (entry->second.result->cf.load(std::memory_order_acquire) == nullptr) {
++(entry->second.num_refs);
pending.push_back(std::make_unique<Token>(entry, Token::ctor_tag()));
}
@@ -129,7 +131,7 @@ CompileCache::count_pending()
std::lock_guard<std::mutex> guard(_lock);
size_t pending = 0;
for (const auto &entry: _cached) {
- if (entry.second.cf.load(std::memory_order_acquire) == nullptr) {
+ if (entry.second.result->cf.load(std::memory_order_acquire) == nullptr) {
++pending;
}
}
@@ -139,12 +141,11 @@ CompileCache::count_pending()
void
CompileCache::CompileTask::run()
{
- auto &entry = token->_entry->second;
- auto result = std::make_unique<CompiledFunction>(*function, pass_params);
- std::lock_guard<std::mutex> guard(entry.result_lock);
- entry.compiled_function = std::move(result);
- entry.cf.store(entry.compiled_function.get(), std::memory_order_release);
- entry.cond.notify_all();
+ auto compiled = std::make_unique<CompiledFunction>(*function, pass_params);
+ std::lock_guard<std::mutex> guard(result->lock);
+ result->compiled_function = std::move(compiled);
+ result->cf.store(result->compiled_function.get(), std::memory_order_release);
+ result->cond.notify_all();
}
} // namespace vespalib::eval
diff --git a/eval/src/vespa/eval/eval/llvm/compile_cache.h b/eval/src/vespa/eval/eval/llvm/compile_cache.h
index e8f87e454d3..61d0cc83d94 100644
--- a/eval/src/vespa/eval/eval/llvm/compile_cache.h
+++ b/eval/src/vespa/eval/eval/llvm/compile_cache.h
@@ -23,17 +23,22 @@ class CompileCache
{
private:
using Key = vespalib::string;
- struct Value {
- size_t num_refs;
+ struct Result {
+ using SP = std::shared_ptr<Result>;
std::atomic<const CompiledFunction *> cf;
- std::mutex result_lock;
+ std::mutex lock;
std::condition_variable cond;
CompiledFunction::UP compiled_function;
+ Result() : cf(nullptr), lock(), cond(), compiled_function(nullptr) {}
+ };
+ struct Value {
+ size_t num_refs;
+ Result::SP result;
struct ctor_tag {};
- Value(ctor_tag) : num_refs(1), cf(nullptr), result_lock(), cond(), compiled_function() {}
+ Value(ctor_tag) : num_refs(1), result(std::make_shared<Result>()) {}
const CompiledFunction &wait_for_result();
const CompiledFunction &get() {
- const CompiledFunction *ptr = cf.load(std::memory_order_acquire);
+ const CompiledFunction *ptr = result->cf.load(std::memory_order_acquire);
if (ptr == nullptr) {
return wait_for_result();
}
@@ -44,10 +49,10 @@ private:
static std::mutex _lock;
static Map _cached;
static uint64_t _executor_tag;
- static std::vector<std::pair<uint64_t,Executor*>> _executor_stack;
+ static std::vector<std::pair<uint64_t,std::shared_ptr<Executor>>> _executor_stack;
static void release(Map::iterator entry);
- static uint64_t attach_executor(Executor &executor);
+ static uint64_t attach_executor(std::shared_ptr<Executor> executor);
static void detach_executor(uint64_t tag);
public:
@@ -55,7 +60,6 @@ public:
{
private:
friend class CompileCache;
- friend class CompileTask;
struct ctor_tag {};
CompileCache::Map::iterator _entry;
public:
@@ -80,14 +84,15 @@ public:
ExecutorBinding &operator=(ExecutorBinding &&) = delete;
ExecutorBinding &operator=(const ExecutorBinding &) = delete;
using UP = std::unique_ptr<ExecutorBinding>;
- explicit ExecutorBinding(Executor &executor, ctor_tag) : _tag(attach_executor(executor)) {}
+ explicit ExecutorBinding(std::shared_ptr<Executor> executor, ctor_tag)
+ : _tag(attach_executor(std::move(executor))) {}
~ExecutorBinding() { detach_executor(_tag); }
};
static Token::UP compile(const Function &function, PassParams pass_params);
static void wait_pending();
- static ExecutorBinding::UP bind(Executor &executor) {
- return std::make_unique<ExecutorBinding>(executor, ExecutorBinding::ctor_tag());
+ static ExecutorBinding::UP bind(std::shared_ptr<Executor> executor) {
+ return std::make_unique<ExecutorBinding>(std::move(executor), ExecutorBinding::ctor_tag());
}
static size_t num_cached();
static size_t num_bound();
@@ -98,9 +103,9 @@ private:
struct CompileTask : public Executor::Task {
std::shared_ptr<Function const> function;
PassParams pass_params;
- Token::UP token;
- CompileTask(const Function &function_in, PassParams pass_params_in, Token::UP token_in)
- : function(function_in.shared_from_this()), pass_params(pass_params_in), token(std::move(token_in)) {}
+ Result::SP result;
+ CompileTask(const Function &function_in, PassParams pass_params_in, Result::SP result_in)
+ : function(function_in.shared_from_this()), pass_params(pass_params_in), result(std::move(result_in)) {}
void run() override;
};
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 719ba359ccf..962ee65c10d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -302,8 +302,8 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
_warmupExecutor = std::make_unique<vespalib::ThreadStackExecutor>(4, 128*1024, index_warmup_executor);
const size_t sharedThreads = deriveCompactionCompressionThreads(protonConfig, hwInfo.cpu());
- _sharedExecutor = std::make_unique<vespalib::BlockingThreadStackExecutor>(sharedThreads, 128*1024, sharedThreads*16, proton_shared_executor);
- _compile_cache_executor_binding = vespalib::eval::CompileCache::bind(*_sharedExecutor);
+ _sharedExecutor = std::make_shared<vespalib::BlockingThreadStackExecutor>(sharedThreads, 128*1024, sharedThreads*16, proton_shared_executor);
+ _compile_cache_executor_binding = vespalib::eval::CompileCache::bind(_sharedExecutor);
InitializeThreads initializeThreads;
if (protonConfig.initialize.threads > 0) {
initializeThreads = std::make_shared<vespalib::ThreadStackExecutor>(protonConfig.initialize.threads, 128 * 1024, initialize_executor);
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h
index d0b76bd5804..d5c1a8b7b78 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.h
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.h
@@ -112,7 +112,7 @@ private:
ProtonConfigurer _protonConfigurer;
ProtonConfigFetcher _protonConfigFetcher;
std::unique_ptr<vespalib::ThreadStackExecutorBase> _warmupExecutor;
- std::unique_ptr<vespalib::ThreadStackExecutorBase> _sharedExecutor;
+ std::shared_ptr<vespalib::ThreadStackExecutorBase> _sharedExecutor;
vespalib::eval::CompileCache::ExecutorBinding::UP _compile_cache_executor_binding;
matching::QueryLimiter _queryLimiter;
vespalib::Clock _clock;
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
index efb1dbf4054..ad5d78d5ab6 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
@@ -87,6 +87,7 @@ ThreadStackExecutorBase::obtainTask(Worker &worker)
if (!worker.idle) {
assert(_taskCount != 0);
--_taskCount;
+ wakeup(monitor);
_barrier.completeEvent(worker.task.token);
worker.idle = true;
}
@@ -96,7 +97,6 @@ ThreadStackExecutorBase::obtainTask(Worker &worker)
worker.task = std::move(_tasks.front());
worker.idle = false;
_tasks.pop();
- wakeup(monitor);
return true;
}
if (_closed) {