diff options
author | Håvard Pettersen <havardpe@yahooinc.com> | 2022-10-12 09:40:03 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@yahooinc.com> | 2022-10-12 15:24:29 +0000 |
commit | 8568274e54d065ae904907537dcf4ce6a9581bfa (patch) | |
tree | bf3ad64563670aca5bfc98c88171fe9bc7443280 /vespalib | |
parent | 0d6d4159ba109f78fd9fea0c6b5e8d940ffc7e90 (diff) |
more coroutines
Diffstat (limited to 'vespalib')
-rw-r--r-- | vespalib/src/tests/coro/lazy/lazy_test.cpp | 74 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/coro/detached.h | 12 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/coro/lazy.h | 79 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/coro/sync_wait.h | 31 |
4 files changed, 173 insertions, 23 deletions
diff --git a/vespalib/src/tests/coro/lazy/lazy_test.cpp b/vespalib/src/tests/coro/lazy/lazy_test.cpp index a715e473aaf..4ba1c950ad0 100644 --- a/vespalib/src/tests/coro/lazy/lazy_test.cpp +++ b/vespalib/src/tests/coro/lazy/lazy_test.cpp @@ -2,11 +2,35 @@ #include <vespa/vespalib/coro/lazy.h> #include <vespa/vespalib/coro/sync_wait.h> +#include <vespa/vespalib/util/require.h> #include <vespa/vespalib/gtest/gtest.h> +#include <thread> + using vespalib::coro::Lazy; using vespalib::coro::sync_wait; +std::vector<std::thread> threads; +struct JoinThreads { + ~JoinThreads() { + for (auto &thread: threads) { + thread.join(); + } + threads.clear(); + } +}; + +auto run_in_other_thread() { + struct awaiter { + bool await_ready() const noexcept { return false; } + void await_suspend(std::coroutine_handle<> handle) const { + threads.push_back(std::thread(handle)); + } + void await_resume() const noexcept {} + }; + return awaiter(); +} + Lazy<int> make_lazy(int value) { co_return value; } @@ -21,6 +45,33 @@ Lazy<int> async_sum(Lazy<int> a, Lazy<int> b) { co_return (co_await a + co_await b); } +Lazy<std::unique_ptr<int>> move_only_int() { + co_return std::make_unique<int>(123); +} + +Lazy<int> extract_rvalue() { + auto res = co_await move_only_int(); + co_return *res; +} + +Lazy<int> will_throw() { + REQUIRE_FAILED("failed on purpose"); + co_return 123; +} + +template<typename T> +Lazy<T> forward_value(Lazy<T> value) { + co_return co_await std::move(value); +} + +template <typename T> +Lazy<T> switch_thread(Lazy<T> value) { + std::cerr << "switching from thread " << std::this_thread::get_id() << std::endl; + co_await run_in_other_thread(); + std::cerr << "........... to thread " << std::this_thread::get_id() << std::endl; + co_return co_await value; +} + TEST(LazyTest, simple_lazy_value) { auto lazy = make_lazy(42); auto result = sync_wait(lazy); @@ -41,4 +92,27 @@ TEST(LazyTest, async_sum_of_external_async_values) { EXPECT_EQ(result, 300); } +TEST(LazyTest, extract_rvalue_from_lazy_in_coroutine) { + auto lazy = extract_rvalue(); + auto result = sync_wait(lazy); + EXPECT_EQ(result, 123); +} + +TEST(LazyTest, extract_rvalue_from_lazy_in_sync_wait) { + auto result = sync_wait(move_only_int()); + EXPECT_EQ(*result, 123); +} + +TEST(LazyTest, calculate_result_in_another_thread) { + JoinThreads thread_guard; + auto result = sync_wait(switch_thread(make_lazy(7))); + EXPECT_EQ(result, 7); +} + +TEST(LazyTest, exceptions_are_propagated) { + JoinThreads thread_guard; + auto lazy = switch_thread(forward_value(will_throw())); + EXPECT_THROW(sync_wait(lazy), vespalib::RequireFailedException); +} + GTEST_MAIN_RUN_ALL_TESTS() diff --git a/vespalib/src/vespa/vespalib/coro/detached.h b/vespalib/src/vespa/vespalib/coro/detached.h index 6d051e53121..5e3fa1452fa 100644 --- a/vespalib/src/vespa/vespalib/coro/detached.h +++ b/vespalib/src/vespa/vespalib/coro/detached.h @@ -7,6 +7,18 @@ namespace vespalib::coro { +/** + * coroutine return type + * + * The coroutine is eager (will not suspend in initial_suspend) and + * self destroying (will not suspend in final_suspend). The return + * value gives no way of interacting with the coroutine. Without any + * co_await operations this acts similar to a normal subroutine. Note + * that letting a detached coroutine wait for a Lazy<T> will + * essentially attach it to the Lazy<T> as a continuation and resume + * it, but will require the Lazy<T> not to be deleted mid flight + * (started but not completed). + **/ struct Detached { struct promise_type { Detached get_return_object() { return {}; } diff --git a/vespalib/src/vespa/vespalib/coro/lazy.h b/vespalib/src/vespa/vespalib/coro/lazy.h index b007f565c93..ddfaa07528b 100644 --- a/vespalib/src/vespa/vespalib/coro/lazy.h +++ b/vespalib/src/vespa/vespalib/coro/lazy.h @@ -5,9 +5,22 @@ #include <concepts> #include <coroutine> #include <optional> +#include <exception> namespace vespalib::coro { +/** + * coroutine return type + * + * The coroutine is lazy (will suspend in initial_suspend) and + * destroyed form the outside (will suspend in final_suspend). Waiting + * for a Lazy<T> using co_await will use symmetric transfer to suspend + * the waiting coroutine and resuming this one. The waiting coroutine + * is registered as a continuation and will be resumed again once the + * result is avalable (also using symmetric transfer). The result is + * assumed to be produced asynchronously. If you need to access it + * from the outside (in that specific thread); use sync_wait. + **/ template <std::movable T> class [[nodiscard]] Lazy { public: @@ -18,47 +31,73 @@ public: struct awaiter { bool await_ready() const noexcept { return false; } std::coroutine_handle<> await_suspend(Handle handle) const noexcept { - auto waiter = handle.promise().waiter; - return waiter ? waiter : std::noop_coroutine(); + return handle.promise().waiter; } void await_resume() const noexcept {} }; return awaiter(); } - void return_value(T ret_value) noexcept { + void return_value(const T &ret_value) noexcept(std::is_nothrow_move_constructible_v<T>) requires(std::copy_constructible<T>) { + value = ret_value; + } + void return_value(T &&ret_value) noexcept(std::is_nothrow_move_constructible_v<T>) { value = std::move(ret_value); } - static void unhandled_exception() { std::terminate(); } + void unhandled_exception() noexcept { + exception = std::current_exception(); + } std::optional<T> value; + std::exception_ptr exception; std::coroutine_handle<> waiter; promise_type(promise_type &&) = delete; promise_type(const promise_type &) = delete; - promise_type() : value(std::nullopt), waiter(nullptr) {} + promise_type() : value(std::nullopt), exception(), waiter(std::noop_coroutine()) {} + T &result() & { + if (exception) { + std::rethrow_exception(exception); + } + return *value; + } + T &&result() && { + if (exception) { + std::rethrow_exception(exception); + } + return std::move(*value); + } }; using Handle = std::coroutine_handle<promise_type>; private: Handle _handle; - + + struct awaiter_base { + Handle handle; + awaiter_base(Handle handle_in) noexcept : handle(handle_in) {} + bool await_ready() const noexcept { return handle.done(); } + Handle await_suspend(std::coroutine_handle<> waiter) const noexcept { + handle.promise().waiter = waiter; + return handle; + } + }; + public: Lazy(const Lazy &) = delete; Lazy &operator=(const Lazy &) = delete; explicit Lazy(Handle handle_in) noexcept : _handle(handle_in) {} Lazy(Lazy &&rhs) noexcept : _handle(std::exchange(rhs._handle, nullptr)) {} - auto operator co_await() { - struct awaiter { - Handle handle; - bool await_ready() const noexcept { - return handle.done(); - } - Handle await_suspend(std::coroutine_handle<> waiter) const noexcept { - handle.promise().waiter = waiter; - return handle; - } - T &await_resume() const noexcept { - return *handle.promise().value; - } - awaiter(Handle handle_in) : handle(handle_in) {} + auto operator co_await() & noexcept { + struct awaiter : awaiter_base { + using awaiter_base::handle; + awaiter(Handle handle_in) noexcept : awaiter_base(handle_in) {} + decltype(auto) await_resume() const { return handle.promise().result(); } + }; + return awaiter(_handle); + } + auto operator co_await() && noexcept { + struct awaiter : awaiter_base { + using awaiter_base::handle; + awaiter(Handle handle_in) noexcept : awaiter_base(handle_in) {} + decltype(auto) await_resume() const { return std::move(handle.promise()).result(); } }; return awaiter(_handle); } diff --git a/vespalib/src/vespa/vespalib/coro/sync_wait.h b/vespalib/src/vespa/vespalib/coro/sync_wait.h index e6a8fdc43f6..bdea2dfc7f0 100644 --- a/vespalib/src/vespa/vespalib/coro/sync_wait.h +++ b/vespalib/src/vespa/vespalib/coro/sync_wait.h @@ -4,31 +4,56 @@ #include "detached.h" #include "lazy.h" -#include <coroutine> #include <vespa/vespalib/util/gate.h> +#include <coroutine> +#include <exception> + namespace vespalib::coro { template <typename T, typename S> Detached signal_when_done(Lazy<T> &value, S &sink) { - sink(co_await value); + try { + sink(co_await value); + } catch (...) { + sink(std::current_exception()); + } } +/** + * Wait for a lazy value to be calculated (note that waiting for a + * value will also start calculating it). Make sure the thread waiting + * is not needed in the calculation of the value, or you will end up + * with a deadlock. + **/ template <typename T> T &sync_wait(Lazy<T> &value) { struct MySink { Gate gate; T *result; + std::exception_ptr exception; void operator()(T &result_in) { result = &result_in; gate.countDown(); } - MySink() : gate(), result(nullptr) {} + void operator()(std::exception_ptr exception_in) { + exception = exception_in; + gate.countDown(); + } + MySink() : gate(), result(nullptr), exception() {} }; MySink sink; signal_when_done(value, sink); sink.gate.await(); + if (sink.exception) { + std::rethrow_exception(sink.exception); + } return *sink.result; } +template <typename T> +T &&sync_wait(Lazy<T> &&value) { + return std::move(sync_wait(value)); +} + } |