diff options
author | HÃ¥vard Pettersen <3535158+havardpe@users.noreply.github.com> | 2019-12-13 14:54:22 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-12-13 14:54:22 +0100 |
commit | 4eccbb84c84c9aa221b61fcac50fa5194d37f0fe (patch) | |
tree | 535b2083a7712a5b8b2eafd0313945890fd4a2e9 /eval | |
parent | c7b9bb6679d67510881e3bba725f5f368783772e (diff) | |
parent | e4cbe3c68e5ab24e8331daa6263496fa028d49a3 (diff) |
Merge pull request #11563 from vespa-engine/havardpe/more-robust-executor-binding
more robust executor binding
Diffstat (limited to 'eval')
-rw-r--r-- | eval/src/tests/eval/compile_cache/compile_cache_test.cpp | 98 | ||||
-rw-r--r-- | eval/src/vespa/eval/eval/llvm/compile_cache.cpp | 42 | ||||
-rw-r--r-- | eval/src/vespa/eval/eval/llvm/compile_cache.h | 26 |
3 files changed, 146 insertions, 20 deletions
diff --git a/eval/src/tests/eval/compile_cache/compile_cache_test.cpp b/eval/src/tests/eval/compile_cache/compile_cache_test.cpp index 09bc2f2e0ee..c164972c2e1 100644 --- a/eval/src/tests/eval/compile_cache/compile_cache_test.cpp +++ b/eval/src/tests/eval/compile_cache/compile_cache_test.cpp @@ -11,6 +11,21 @@ using namespace vespalib; using namespace vespalib::eval; +struct MyExecutor : public Executor { + std::vector<Executor::Task::UP> tasks; + Executor::Task::UP execute(Executor::Task::UP task) override { + tasks.push_back(std::move(task)); + return Executor::Task::UP(); + } + void run_tasks() { + for (const auto &task: tasks) { + task.get()->run(); + } + tasks.clear(); + } + ~MyExecutor() { run_tasks(); } +}; + //----------------------------------------------------------------------------- TEST("require that parameter passing selection affects function key") { @@ -141,6 +156,86 @@ TEST("require that cache usage works") { TEST_DO(verify_cache(0, 0)); } +TEST("require that async cache usage works") { + ThreadStackExecutor executor(8, 256*1024); + auto binding = CompileCache::bind(executor); + TEST_DO(verify_cache(0, 0)); + CompileCache::Token::UP token_a = CompileCache::compile(*Function::parse("x+y"), PassParams::SEPARATE); + EXPECT_EQUAL(5.0, token_a->get().get_function<2>()(2.0, 3.0)); + TEST_DO(verify_cache(1, 1)); + CompileCache::Token::UP token_b = CompileCache::compile(*Function::parse("x*y"), PassParams::SEPARATE); + EXPECT_EQUAL(6.0, token_b->get().get_function<2>()(2.0, 3.0)); + TEST_DO(verify_cache(2, 2)); + CompileCache::Token::UP token_c = CompileCache::compile(*Function::parse("x+y"), PassParams::SEPARATE); + EXPECT_EQUAL(5.0, token_c->get().get_function<2>()(2.0, 3.0)); + TEST_DO(verify_cache(2, 3)); + executor.sync(); // wait for compile threads to drop all compile cache tokens + token_a.reset(); + TEST_DO(verify_cache(2, 2)); + token_b.reset(); + TEST_DO(verify_cache(1, 1)); + token_c.reset(); + TEST_DO(verify_cache(0, 0)); +} + +TEST("require that compile tasks are run in the most recently bound executor") { + MyExecutor exe1; + MyExecutor exe2; + auto token0 = CompileCache::compile(*Function::parse("a+b"), PassParams::SEPARATE); + EXPECT_EQUAL(CompileCache::num_bound(), 0u); + EXPECT_EQUAL(exe1.tasks.size(), 0u); + EXPECT_EQUAL(exe2.tasks.size(), 0u); + { + auto bind1 = CompileCache::bind(exe1); + auto token1 = CompileCache::compile(*Function::parse("a-b"), PassParams::SEPARATE); + EXPECT_EQUAL(CompileCache::num_bound(), 1u); + EXPECT_EQUAL(exe1.tasks.size(), 1u); + EXPECT_EQUAL(exe2.tasks.size(), 0u); + { + auto bind2 = CompileCache::bind(exe2); + auto token2 = CompileCache::compile(*Function::parse("a*b"), PassParams::SEPARATE); + EXPECT_EQUAL(CompileCache::num_bound(), 2u); + EXPECT_EQUAL(exe1.tasks.size(), 1u); + EXPECT_EQUAL(exe2.tasks.size(), 1u); + } + EXPECT_EQUAL(CompileCache::num_bound(), 1u); + } + EXPECT_EQUAL(CompileCache::num_bound(), 0u); +} + +TEST("require that executors may be unbound in any order") { + MyExecutor exe1; + MyExecutor exe2; + MyExecutor exe3; + auto bind1 = CompileCache::bind(exe1); + auto bind2 = CompileCache::bind(exe2); + auto bind3 = CompileCache::bind(exe3); + EXPECT_EQUAL(CompileCache::num_bound(), 3u); + bind2.reset(); + EXPECT_EQUAL(CompileCache::num_bound(), 2u); + bind3.reset(); + EXPECT_EQUAL(CompileCache::num_bound(), 1u); + auto token = CompileCache::compile(*Function::parse("a+b"), PassParams::SEPARATE); + EXPECT_EQUAL(exe1.tasks.size(), 1u); + EXPECT_EQUAL(exe2.tasks.size(), 0u); + EXPECT_EQUAL(exe3.tasks.size(), 0u); +} + +TEST("require that the same executor can be bound multiple times") { + MyExecutor exe1; + auto bind1 = CompileCache::bind(exe1); + auto bind2 = CompileCache::bind(exe1); + auto bind3 = CompileCache::bind(exe1); + EXPECT_EQUAL(CompileCache::num_bound(), 3u); + bind2.reset(); + EXPECT_EQUAL(CompileCache::num_bound(), 2u); + bind3.reset(); + EXPECT_EQUAL(CompileCache::num_bound(), 1u); + auto token = CompileCache::compile(*Function::parse("a+b"), PassParams::SEPARATE); + EXPECT_EQUAL(CompileCache::num_bound(), 1u); + EXPECT_EQUAL(exe1.tasks.size(), 1u); +} + struct CompileCheck : test::EvalSpec::EvalTest { struct Entry { CompileCache::Token::UP fun; @@ -194,7 +289,7 @@ TEST_F("compile sequentially, then run all conformance tests", test::EvalSpec()) TEST_F("compile concurrently (8 threads), then run all conformance tests", test::EvalSpec()) { f1.add_all_cases(); ThreadStackExecutor executor(8, 256*1024); - CompileCache::attach_executor(executor); + auto binding = CompileCache::bind(executor); while (executor.num_idle_workers() < 8) { std::this_thread::sleep_for(1ms); } @@ -210,7 +305,6 @@ TEST_F("compile concurrently (8 threads), then run all conformance tests", test: fprintf(stderr, "concurrent (run %zu): setup: %zu ms, wait: %zu ms, verify: %zu us, total: %zu ms\n", i, count_ms(t1 - t0), count_ms(t2 - t1), count_us(t3 - t2), count_ms(t3 - t0)); } - CompileCache::detach_executor(); } //----------------------------------------------------------------------------- diff --git a/eval/src/vespa/eval/eval/llvm/compile_cache.cpp b/eval/src/vespa/eval/eval/llvm/compile_cache.cpp index 4f93c496f2a..b486a750992 100644 --- a/eval/src/vespa/eval/eval/llvm/compile_cache.cpp +++ b/eval/src/vespa/eval/eval/llvm/compile_cache.cpp @@ -8,7 +8,8 @@ namespace eval { std::mutex CompileCache::_lock{}; CompileCache::Map CompileCache::_cached{}; -Executor *CompileCache::_executor{nullptr}; +uint64_t CompileCache::_executor_tag{0}; +std::vector<std::pair<uint64_t,Executor*>> CompileCache::_executor_stack{}; const CompiledFunction & CompileCache::Value::wait_for_result() @@ -27,6 +28,24 @@ CompileCache::release(Map::iterator entry) } } +uint64_t +CompileCache::attach_executor(Executor &executor) +{ + std::lock_guard<std::mutex> guard(_lock); + _executor_stack.emplace_back(++_executor_tag, &executor); + return _executor_tag; +} + +void +CompileCache::detach_executor(uint64_t tag) +{ + std::lock_guard<std::mutex> guard(_lock); + auto &list = _executor_stack; + list.erase(std::remove_if(list.begin(), list.end(), + [tag](const auto &a){ return (a.first == tag); }), + list.end()); +}; + CompileCache::Token::UP CompileCache::compile(const Function &function, PassParams pass_params) { @@ -46,8 +65,8 @@ CompileCache::compile(const Function &function, PassParams pass_params) ++(res.first->second.num_refs); task = std::make_unique<CompileTask>(function, pass_params, std::make_unique<Token>(res.first, Token::ctor_tag())); - if (_executor != nullptr) { - task = _executor->execute(std::move(task)); + if (!_executor_stack.empty()) { + task = _executor_stack.back().second->execute(std::move(task)); } } } @@ -57,25 +76,18 @@ CompileCache::compile(const Function &function, PassParams pass_params) return token; } -void -CompileCache::attach_executor(Executor &executor) -{ - std::lock_guard<std::mutex> guard(_lock); - _executor = &executor; -} - -void -CompileCache::detach_executor() +size_t +CompileCache::num_cached() { std::lock_guard<std::mutex> guard(_lock); - _executor = nullptr; + return _cached.size(); } size_t -CompileCache::num_cached() +CompileCache::num_bound() { std::lock_guard<std::mutex> guard(_lock); - return _cached.size(); + return _executor_stack.size(); } size_t diff --git a/eval/src/vespa/eval/eval/llvm/compile_cache.h b/eval/src/vespa/eval/eval/llvm/compile_cache.h index 0adca808256..65cec9c0d48 100644 --- a/eval/src/vespa/eval/eval/llvm/compile_cache.h +++ b/eval/src/vespa/eval/eval/llvm/compile_cache.h @@ -41,9 +41,12 @@ private: using Map = std::map<Key,Value>; static std::mutex _lock; static Map _cached; - static Executor *_executor; + static uint64_t _executor_tag; + static std::vector<std::pair<uint64_t,Executor*>> _executor_stack; static void release(Map::iterator entry); + static uint64_t attach_executor(Executor &executor); + static void detach_executor(uint64_t tag); public: class Token @@ -64,10 +67,27 @@ public: ~Token() { CompileCache::release(_entry); } }; + class ExecutorBinding { + private: + friend class CompileCache; + uint64_t _tag; + struct ctor_tag {}; + public: + ExecutorBinding(ExecutorBinding &&) = delete; + ExecutorBinding(const ExecutorBinding &) = delete; + ExecutorBinding &operator=(ExecutorBinding &&) = delete; + ExecutorBinding &operator=(const ExecutorBinding &) = delete; + using UP = std::unique_ptr<ExecutorBinding>; + explicit ExecutorBinding(Executor &executor, ctor_tag) : _tag(attach_executor(executor)) {} + ~ExecutorBinding() { detach_executor(_tag); } + }; + static Token::UP compile(const Function &function, PassParams pass_params); - static void attach_executor(Executor &executor); - static void detach_executor(); + static ExecutorBinding::UP bind(Executor &executor) { + return std::make_unique<ExecutorBinding>(executor, ExecutorBinding::ctor_tag()); + } static size_t num_cached(); + static size_t num_bound(); static size_t count_refs(); static size_t count_pending(); |