aboutsummaryrefslogtreecommitdiffstats
path: root/eval
diff options
context:
space:
mode:
authorHÃ¥vard Pettersen <3535158+havardpe@users.noreply.github.com>2019-12-13 14:54:22 +0100
committerGitHub <noreply@github.com>2019-12-13 14:54:22 +0100
commit4eccbb84c84c9aa221b61fcac50fa5194d37f0fe (patch)
tree535b2083a7712a5b8b2eafd0313945890fd4a2e9 /eval
parentc7b9bb6679d67510881e3bba725f5f368783772e (diff)
parente4cbe3c68e5ab24e8331daa6263496fa028d49a3 (diff)
Merge pull request #11563 from vespa-engine/havardpe/more-robust-executor-binding
more robust executor binding
Diffstat (limited to 'eval')
-rw-r--r--eval/src/tests/eval/compile_cache/compile_cache_test.cpp98
-rw-r--r--eval/src/vespa/eval/eval/llvm/compile_cache.cpp42
-rw-r--r--eval/src/vespa/eval/eval/llvm/compile_cache.h26
3 files changed, 146 insertions, 20 deletions
diff --git a/eval/src/tests/eval/compile_cache/compile_cache_test.cpp b/eval/src/tests/eval/compile_cache/compile_cache_test.cpp
index 09bc2f2e0ee..c164972c2e1 100644
--- a/eval/src/tests/eval/compile_cache/compile_cache_test.cpp
+++ b/eval/src/tests/eval/compile_cache/compile_cache_test.cpp
@@ -11,6 +11,21 @@
using namespace vespalib;
using namespace vespalib::eval;
+struct MyExecutor : public Executor {
+ std::vector<Executor::Task::UP> tasks;
+ Executor::Task::UP execute(Executor::Task::UP task) override {
+ tasks.push_back(std::move(task));
+ return Executor::Task::UP();
+ }
+ void run_tasks() {
+ for (const auto &task: tasks) {
+ task.get()->run();
+ }
+ tasks.clear();
+ }
+ ~MyExecutor() { run_tasks(); }
+};
+
//-----------------------------------------------------------------------------
TEST("require that parameter passing selection affects function key") {
@@ -141,6 +156,86 @@ TEST("require that cache usage works") {
TEST_DO(verify_cache(0, 0));
}
+TEST("require that async cache usage works") {
+ ThreadStackExecutor executor(8, 256*1024);
+ auto binding = CompileCache::bind(executor);
+ TEST_DO(verify_cache(0, 0));
+ CompileCache::Token::UP token_a = CompileCache::compile(*Function::parse("x+y"), PassParams::SEPARATE);
+ EXPECT_EQUAL(5.0, token_a->get().get_function<2>()(2.0, 3.0));
+ TEST_DO(verify_cache(1, 1));
+ CompileCache::Token::UP token_b = CompileCache::compile(*Function::parse("x*y"), PassParams::SEPARATE);
+ EXPECT_EQUAL(6.0, token_b->get().get_function<2>()(2.0, 3.0));
+ TEST_DO(verify_cache(2, 2));
+ CompileCache::Token::UP token_c = CompileCache::compile(*Function::parse("x+y"), PassParams::SEPARATE);
+ EXPECT_EQUAL(5.0, token_c->get().get_function<2>()(2.0, 3.0));
+ TEST_DO(verify_cache(2, 3));
+ executor.sync(); // wait for compile threads to drop all compile cache tokens
+ token_a.reset();
+ TEST_DO(verify_cache(2, 2));
+ token_b.reset();
+ TEST_DO(verify_cache(1, 1));
+ token_c.reset();
+ TEST_DO(verify_cache(0, 0));
+}
+
+TEST("require that compile tasks are run in the most recently bound executor") {
+ MyExecutor exe1;
+ MyExecutor exe2;
+ auto token0 = CompileCache::compile(*Function::parse("a+b"), PassParams::SEPARATE);
+ EXPECT_EQUAL(CompileCache::num_bound(), 0u);
+ EXPECT_EQUAL(exe1.tasks.size(), 0u);
+ EXPECT_EQUAL(exe2.tasks.size(), 0u);
+ {
+ auto bind1 = CompileCache::bind(exe1);
+ auto token1 = CompileCache::compile(*Function::parse("a-b"), PassParams::SEPARATE);
+ EXPECT_EQUAL(CompileCache::num_bound(), 1u);
+ EXPECT_EQUAL(exe1.tasks.size(), 1u);
+ EXPECT_EQUAL(exe2.tasks.size(), 0u);
+ {
+ auto bind2 = CompileCache::bind(exe2);
+ auto token2 = CompileCache::compile(*Function::parse("a*b"), PassParams::SEPARATE);
+ EXPECT_EQUAL(CompileCache::num_bound(), 2u);
+ EXPECT_EQUAL(exe1.tasks.size(), 1u);
+ EXPECT_EQUAL(exe2.tasks.size(), 1u);
+ }
+ EXPECT_EQUAL(CompileCache::num_bound(), 1u);
+ }
+ EXPECT_EQUAL(CompileCache::num_bound(), 0u);
+}
+
+TEST("require that executors may be unbound in any order") {
+ MyExecutor exe1;
+ MyExecutor exe2;
+ MyExecutor exe3;
+ auto bind1 = CompileCache::bind(exe1);
+ auto bind2 = CompileCache::bind(exe2);
+ auto bind3 = CompileCache::bind(exe3);
+ EXPECT_EQUAL(CompileCache::num_bound(), 3u);
+ bind2.reset();
+ EXPECT_EQUAL(CompileCache::num_bound(), 2u);
+ bind3.reset();
+ EXPECT_EQUAL(CompileCache::num_bound(), 1u);
+ auto token = CompileCache::compile(*Function::parse("a+b"), PassParams::SEPARATE);
+ EXPECT_EQUAL(exe1.tasks.size(), 1u);
+ EXPECT_EQUAL(exe2.tasks.size(), 0u);
+ EXPECT_EQUAL(exe3.tasks.size(), 0u);
+}
+
+TEST("require that the same executor can be bound multiple times") {
+ MyExecutor exe1;
+ auto bind1 = CompileCache::bind(exe1);
+ auto bind2 = CompileCache::bind(exe1);
+ auto bind3 = CompileCache::bind(exe1);
+ EXPECT_EQUAL(CompileCache::num_bound(), 3u);
+ bind2.reset();
+ EXPECT_EQUAL(CompileCache::num_bound(), 2u);
+ bind3.reset();
+ EXPECT_EQUAL(CompileCache::num_bound(), 1u);
+ auto token = CompileCache::compile(*Function::parse("a+b"), PassParams::SEPARATE);
+ EXPECT_EQUAL(CompileCache::num_bound(), 1u);
+ EXPECT_EQUAL(exe1.tasks.size(), 1u);
+}
+
struct CompileCheck : test::EvalSpec::EvalTest {
struct Entry {
CompileCache::Token::UP fun;
@@ -194,7 +289,7 @@ TEST_F("compile sequentially, then run all conformance tests", test::EvalSpec())
TEST_F("compile concurrently (8 threads), then run all conformance tests", test::EvalSpec()) {
f1.add_all_cases();
ThreadStackExecutor executor(8, 256*1024);
- CompileCache::attach_executor(executor);
+ auto binding = CompileCache::bind(executor);
while (executor.num_idle_workers() < 8) {
std::this_thread::sleep_for(1ms);
}
@@ -210,7 +305,6 @@ TEST_F("compile concurrently (8 threads), then run all conformance tests", test:
fprintf(stderr, "concurrent (run %zu): setup: %zu ms, wait: %zu ms, verify: %zu us, total: %zu ms\n",
i, count_ms(t1 - t0), count_ms(t2 - t1), count_us(t3 - t2), count_ms(t3 - t0));
}
- CompileCache::detach_executor();
}
//-----------------------------------------------------------------------------
diff --git a/eval/src/vespa/eval/eval/llvm/compile_cache.cpp b/eval/src/vespa/eval/eval/llvm/compile_cache.cpp
index 4f93c496f2a..b486a750992 100644
--- a/eval/src/vespa/eval/eval/llvm/compile_cache.cpp
+++ b/eval/src/vespa/eval/eval/llvm/compile_cache.cpp
@@ -8,7 +8,8 @@ namespace eval {
std::mutex CompileCache::_lock{};
CompileCache::Map CompileCache::_cached{};
-Executor *CompileCache::_executor{nullptr};
+uint64_t CompileCache::_executor_tag{0};
+std::vector<std::pair<uint64_t,Executor*>> CompileCache::_executor_stack{};
const CompiledFunction &
CompileCache::Value::wait_for_result()
@@ -27,6 +28,24 @@ CompileCache::release(Map::iterator entry)
}
}
+uint64_t
+CompileCache::attach_executor(Executor &executor)
+{
+ std::lock_guard<std::mutex> guard(_lock);
+ _executor_stack.emplace_back(++_executor_tag, &executor);
+ return _executor_tag;
+}
+
+void
+CompileCache::detach_executor(uint64_t tag)
+{
+ std::lock_guard<std::mutex> guard(_lock);
+ auto &list = _executor_stack;
+ list.erase(std::remove_if(list.begin(), list.end(),
+ [tag](const auto &a){ return (a.first == tag); }),
+ list.end());
+};
+
CompileCache::Token::UP
CompileCache::compile(const Function &function, PassParams pass_params)
{
@@ -46,8 +65,8 @@ CompileCache::compile(const Function &function, PassParams pass_params)
++(res.first->second.num_refs);
task = std::make_unique<CompileTask>(function, pass_params,
std::make_unique<Token>(res.first, Token::ctor_tag()));
- if (_executor != nullptr) {
- task = _executor->execute(std::move(task));
+ if (!_executor_stack.empty()) {
+ task = _executor_stack.back().second->execute(std::move(task));
}
}
}
@@ -57,25 +76,18 @@ CompileCache::compile(const Function &function, PassParams pass_params)
return token;
}
-void
-CompileCache::attach_executor(Executor &executor)
-{
- std::lock_guard<std::mutex> guard(_lock);
- _executor = &executor;
-}
-
-void
-CompileCache::detach_executor()
+size_t
+CompileCache::num_cached()
{
std::lock_guard<std::mutex> guard(_lock);
- _executor = nullptr;
+ return _cached.size();
}
size_t
-CompileCache::num_cached()
+CompileCache::num_bound()
{
std::lock_guard<std::mutex> guard(_lock);
- return _cached.size();
+ return _executor_stack.size();
}
size_t
diff --git a/eval/src/vespa/eval/eval/llvm/compile_cache.h b/eval/src/vespa/eval/eval/llvm/compile_cache.h
index 0adca808256..65cec9c0d48 100644
--- a/eval/src/vespa/eval/eval/llvm/compile_cache.h
+++ b/eval/src/vespa/eval/eval/llvm/compile_cache.h
@@ -41,9 +41,12 @@ private:
using Map = std::map<Key,Value>;
static std::mutex _lock;
static Map _cached;
- static Executor *_executor;
+ static uint64_t _executor_tag;
+ static std::vector<std::pair<uint64_t,Executor*>> _executor_stack;
static void release(Map::iterator entry);
+ static uint64_t attach_executor(Executor &executor);
+ static void detach_executor(uint64_t tag);
public:
class Token
@@ -64,10 +67,27 @@ public:
~Token() { CompileCache::release(_entry); }
};
+ class ExecutorBinding {
+ private:
+ friend class CompileCache;
+ uint64_t _tag;
+ struct ctor_tag {};
+ public:
+ ExecutorBinding(ExecutorBinding &&) = delete;
+ ExecutorBinding(const ExecutorBinding &) = delete;
+ ExecutorBinding &operator=(ExecutorBinding &&) = delete;
+ ExecutorBinding &operator=(const ExecutorBinding &) = delete;
+ using UP = std::unique_ptr<ExecutorBinding>;
+ explicit ExecutorBinding(Executor &executor, ctor_tag) : _tag(attach_executor(executor)) {}
+ ~ExecutorBinding() { detach_executor(_tag); }
+ };
+
static Token::UP compile(const Function &function, PassParams pass_params);
- static void attach_executor(Executor &executor);
- static void detach_executor();
+ static ExecutorBinding::UP bind(Executor &executor) {
+ return std::make_unique<ExecutorBinding>(executor, ExecutorBinding::ctor_tag());
+ }
static size_t num_cached();
+ static size_t num_bound();
static size_t count_refs();
static size_t count_pending();