diff options
author | Håvard Pettersen <havardpe@yahooinc.com> | 2022-11-02 11:29:55 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@yahooinc.com> | 2022-11-02 11:29:55 +0000 |
commit | 1737c2d97b98eb2079eba7003043ffafe77edbc6 (patch) | |
tree | c649382f9dadb8359579096647dbbdd17dddce17 /vespalib | |
parent | 88463ea1b79873a63bf0aa6aa2813e7abfa85659 (diff) |
enable scheduling a coroutine on an executor
Diffstat (limited to 'vespalib')
-rw-r--r-- | vespalib/src/tests/coro/lazy/lazy_test.cpp | 65 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/coro/schedule.h | 76 |
2 files changed, 111 insertions, 30 deletions
diff --git a/vespalib/src/tests/coro/lazy/lazy_test.cpp b/vespalib/src/tests/coro/lazy/lazy_test.cpp index b838152249e..ead5e8e6427 100644 --- a/vespalib/src/tests/coro/lazy/lazy_test.cpp +++ b/vespalib/src/tests/coro/lazy/lazy_test.cpp @@ -2,37 +2,21 @@ #include <vespa/vespalib/coro/lazy.h> #include <vespa/vespalib/coro/sync_wait.h> +#include <vespa/vespalib/coro/schedule.h> +#include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/require.h> +#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/gtest/gtest.h> #include <mutex> #include <thread> +using vespalib::Executor; using vespalib::coro::Lazy; +using vespalib::coro::ScheduleFailedException; +using vespalib::coro::schedule; using vespalib::coro::sync_wait; - -std::mutex thread_lock; -std::vector<std::thread> threads; -struct JoinThreads { - ~JoinThreads() { - for (auto &thread: threads) { - thread.join(); - } - threads.clear(); - } -}; - -auto run_in_other_thread() { - struct awaiter { - bool await_ready() const noexcept { return false; } - void await_suspend(std::coroutine_handle<> handle) const { - auto guard = std::lock_guard(thread_lock); - threads.push_back(std::thread(handle)); - } - void await_resume() const noexcept {} - }; - return awaiter(); -} +using vespalib::coro::try_schedule; Lazy<int> make_lazy(int value) { co_return value; @@ -68,9 +52,17 @@ Lazy<T> forward_value(Lazy<T> value) { } template <typename T> -Lazy<T> switch_thread(Lazy<T> value) { +Lazy<std::pair<bool,T>> try_schedule_on(Executor &executor, Lazy<T> value) { std::cerr << "switching from thread " << std::this_thread::get_id() << std::endl; - co_await run_in_other_thread(); + bool accepted = co_await try_schedule(executor); + std::cerr << "........... to thread " << std::this_thread::get_id() << std::endl; + co_return std::make_pair(accepted, co_await value); +} + +template <typename T> +Lazy<T> schedule_on(Executor &executor, Lazy<T> value) { + std::cerr << "switching from thread " << std::this_thread::get_id() << std::endl; + co_await schedule(executor); std::cerr << "........... to thread " << std::this_thread::get_id() << std::endl; co_return co_await value; } @@ -107,15 +99,28 @@ TEST(LazyTest, extract_rvalue_from_lazy_in_sync_wait) { } TEST(LazyTest, calculate_result_in_another_thread) { - JoinThreads thread_guard; - auto result = sync_wait(switch_thread(make_lazy(7))); - EXPECT_EQ(result, 7); + vespalib::ThreadStackExecutor executor(1, 128_Ki); + auto result = sync_wait(try_schedule_on(executor, make_lazy(7))); + EXPECT_EQ(result.first, true); + EXPECT_EQ(result.second, 7); + auto result2 = sync_wait(schedule_on(executor, make_lazy(8))); + EXPECT_EQ(result2, 8); } TEST(LazyTest, exceptions_are_propagated) { - JoinThreads thread_guard; - auto lazy = switch_thread(forward_value(will_throw())); + vespalib::ThreadStackExecutor executor(1, 128_Ki); + auto lazy = try_schedule_on(executor, forward_value(will_throw())); EXPECT_THROW(sync_wait(lazy), vespalib::RequireFailedException); } +TEST(LazyTest, not_able_to_switch_thread_if_executor_is_shut_down) { + vespalib::ThreadStackExecutor executor(1, 128_Ki); + executor.shutdown(); + auto result = sync_wait(try_schedule_on(executor, make_lazy(7))); + EXPECT_EQ(result.first, false); + EXPECT_EQ(result.second, 7); + auto lazy = schedule_on(executor, make_lazy(8)); + EXPECT_THROW(sync_wait(lazy), ScheduleFailedException); +} + GTEST_MAIN_RUN_ALL_TESTS() diff --git a/vespalib/src/vespa/vespalib/coro/schedule.h b/vespalib/src/vespa/vespalib/coro/schedule.h new file mode 100644 index 00000000000..6dfa5b9536c --- /dev/null +++ b/vespalib/src/vespa/vespalib/coro/schedule.h @@ -0,0 +1,76 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/util/executor.h> +#include <coroutine> +#include <exception> + +namespace vespalib::coro { + +struct ScheduleFailedException : std::runtime_error { + using std::runtime_error::runtime_error; +}; + +// Schedule the current coroutine on the given executor. Throws an +// exception if the request was rejected by the executor. + +auto schedule(Executor &executor) { + struct [[nodiscard]] awaiter { + Executor &executor; + awaiter(Executor &executor_in) + : executor(executor_in) {} + bool await_ready() const noexcept { return false; } + void await_suspend(std::coroutine_handle<> handle) { + struct ResumeTask : Executor::Task { + std::coroutine_handle<> handle; + ResumeTask(std::coroutine_handle<> handle_in) + : handle(handle_in) {} + void run() override { handle.resume(); } + }; + Executor::Task::UP task = std::make_unique<ResumeTask>(handle); + task = executor.execute(std::move(task)); + if (task) { + throw ScheduleFailedException("rejected by executor"); + } + } + void await_resume() const noexcept {} + }; + return awaiter(executor); +} + +// Try to schedule the current coroutine on the given executor. If the +// awaiter returns true, the coroutine is now run by the executor. If +// the awaiter returns false, the request was rejected by the executor +// and the coroutine is still running in our original context. + +auto try_schedule(Executor &executor) { + struct [[nodiscard]] awaiter { + Executor &executor; + bool accepted; + awaiter(Executor &executor_in) + : executor(executor_in), accepted(true) {} + bool await_ready() const noexcept { return false; } + bool await_suspend(std::coroutine_handle<> handle) { + struct ResumeTask : Executor::Task { + std::coroutine_handle<> handle; + ResumeTask(std::coroutine_handle<> handle_in) + : handle(handle_in) {} + void run() override { handle.resume(); } + }; + Executor::Task::UP task = std::make_unique<ResumeTask>(handle); + task = executor.execute(std::move(task)); + if (task) { + // need to start with accepted == true to avoid race + // with handle.resume() from executor thread before + // await_suspend has returned. + accepted = false; + } + return accepted; + } + [[nodiscard]] bool await_resume() const noexcept { return accepted; } + }; + return awaiter(executor); +} + +} |