From 8568274e54d065ae904907537dcf4ce6a9581bfa Mon Sep 17 00:00:00 2001 From: HÃ¥vard Pettersen Date: Wed, 12 Oct 2022 09:40:03 +0000 Subject: more coroutines --- vespalib/src/tests/coro/lazy/lazy_test.cpp | 74 ++++++++++++++++++++++++++ vespalib/src/vespa/vespalib/coro/detached.h | 12 +++++ vespalib/src/vespa/vespalib/coro/lazy.h | 79 +++++++++++++++++++++------- vespalib/src/vespa/vespalib/coro/sync_wait.h | 31 +++++++++-- 4 files changed, 173 insertions(+), 23 deletions(-) (limited to 'vespalib') diff --git a/vespalib/src/tests/coro/lazy/lazy_test.cpp b/vespalib/src/tests/coro/lazy/lazy_test.cpp index a715e473aaf..4ba1c950ad0 100644 --- a/vespalib/src/tests/coro/lazy/lazy_test.cpp +++ b/vespalib/src/tests/coro/lazy/lazy_test.cpp @@ -2,11 +2,35 @@ #include #include +#include #include +#include + using vespalib::coro::Lazy; using vespalib::coro::sync_wait; +std::vector threads; +struct JoinThreads { + ~JoinThreads() { + for (auto &thread: threads) { + thread.join(); + } + threads.clear(); + } +}; + +auto run_in_other_thread() { + struct awaiter { + bool await_ready() const noexcept { return false; } + void await_suspend(std::coroutine_handle<> handle) const { + threads.push_back(std::thread(handle)); + } + void await_resume() const noexcept {} + }; + return awaiter(); +} + Lazy make_lazy(int value) { co_return value; } @@ -21,6 +45,33 @@ Lazy async_sum(Lazy a, Lazy b) { co_return (co_await a + co_await b); } +Lazy> move_only_int() { + co_return std::make_unique(123); +} + +Lazy extract_rvalue() { + auto res = co_await move_only_int(); + co_return *res; +} + +Lazy will_throw() { + REQUIRE_FAILED("failed on purpose"); + co_return 123; +} + +template +Lazy forward_value(Lazy value) { + co_return co_await std::move(value); +} + +template +Lazy switch_thread(Lazy value) { + std::cerr << "switching from thread " << std::this_thread::get_id() << std::endl; + co_await run_in_other_thread(); + std::cerr << "........... to thread " << std::this_thread::get_id() << std::endl; + co_return co_await value; +} + TEST(LazyTest, simple_lazy_value) { auto lazy = make_lazy(42); auto result = sync_wait(lazy); @@ -41,4 +92,27 @@ TEST(LazyTest, async_sum_of_external_async_values) { EXPECT_EQ(result, 300); } +TEST(LazyTest, extract_rvalue_from_lazy_in_coroutine) { + auto lazy = extract_rvalue(); + auto result = sync_wait(lazy); + EXPECT_EQ(result, 123); +} + +TEST(LazyTest, extract_rvalue_from_lazy_in_sync_wait) { + auto result = sync_wait(move_only_int()); + EXPECT_EQ(*result, 123); +} + +TEST(LazyTest, calculate_result_in_another_thread) { + JoinThreads thread_guard; + auto result = sync_wait(switch_thread(make_lazy(7))); + EXPECT_EQ(result, 7); +} + +TEST(LazyTest, exceptions_are_propagated) { + JoinThreads thread_guard; + auto lazy = switch_thread(forward_value(will_throw())); + EXPECT_THROW(sync_wait(lazy), vespalib::RequireFailedException); +} + GTEST_MAIN_RUN_ALL_TESTS() diff --git a/vespalib/src/vespa/vespalib/coro/detached.h b/vespalib/src/vespa/vespalib/coro/detached.h index 6d051e53121..5e3fa1452fa 100644 --- a/vespalib/src/vespa/vespalib/coro/detached.h +++ b/vespalib/src/vespa/vespalib/coro/detached.h @@ -7,6 +7,18 @@ namespace vespalib::coro { +/** + * coroutine return type + * + * The coroutine is eager (will not suspend in initial_suspend) and + * self destroying (will not suspend in final_suspend). The return + * value gives no way of interacting with the coroutine. Without any + * co_await operations this acts similar to a normal subroutine. Note + * that letting a detached coroutine wait for a Lazy will + * essentially attach it to the Lazy as a continuation and resume + * it, but will require the Lazy not to be deleted mid flight + * (started but not completed). + **/ struct Detached { struct promise_type { Detached get_return_object() { return {}; } diff --git a/vespalib/src/vespa/vespalib/coro/lazy.h b/vespalib/src/vespa/vespalib/coro/lazy.h index b007f565c93..ddfaa07528b 100644 --- a/vespalib/src/vespa/vespalib/coro/lazy.h +++ b/vespalib/src/vespa/vespalib/coro/lazy.h @@ -5,9 +5,22 @@ #include #include #include +#include namespace vespalib::coro { +/** + * coroutine return type + * + * The coroutine is lazy (will suspend in initial_suspend) and + * destroyed form the outside (will suspend in final_suspend). Waiting + * for a Lazy using co_await will use symmetric transfer to suspend + * the waiting coroutine and resuming this one. The waiting coroutine + * is registered as a continuation and will be resumed again once the + * result is avalable (also using symmetric transfer). The result is + * assumed to be produced asynchronously. If you need to access it + * from the outside (in that specific thread); use sync_wait. + **/ template class [[nodiscard]] Lazy { public: @@ -18,47 +31,73 @@ public: struct awaiter { bool await_ready() const noexcept { return false; } std::coroutine_handle<> await_suspend(Handle handle) const noexcept { - auto waiter = handle.promise().waiter; - return waiter ? waiter : std::noop_coroutine(); + return handle.promise().waiter; } void await_resume() const noexcept {} }; return awaiter(); } - void return_value(T ret_value) noexcept { + void return_value(const T &ret_value) noexcept(std::is_nothrow_move_constructible_v) requires(std::copy_constructible) { + value = ret_value; + } + void return_value(T &&ret_value) noexcept(std::is_nothrow_move_constructible_v) { value = std::move(ret_value); } - static void unhandled_exception() { std::terminate(); } + void unhandled_exception() noexcept { + exception = std::current_exception(); + } std::optional value; + std::exception_ptr exception; std::coroutine_handle<> waiter; promise_type(promise_type &&) = delete; promise_type(const promise_type &) = delete; - promise_type() : value(std::nullopt), waiter(nullptr) {} + promise_type() : value(std::nullopt), exception(), waiter(std::noop_coroutine()) {} + T &result() & { + if (exception) { + std::rethrow_exception(exception); + } + return *value; + } + T &&result() && { + if (exception) { + std::rethrow_exception(exception); + } + return std::move(*value); + } }; using Handle = std::coroutine_handle; private: Handle _handle; - + + struct awaiter_base { + Handle handle; + awaiter_base(Handle handle_in) noexcept : handle(handle_in) {} + bool await_ready() const noexcept { return handle.done(); } + Handle await_suspend(std::coroutine_handle<> waiter) const noexcept { + handle.promise().waiter = waiter; + return handle; + } + }; + public: Lazy(const Lazy &) = delete; Lazy &operator=(const Lazy &) = delete; explicit Lazy(Handle handle_in) noexcept : _handle(handle_in) {} Lazy(Lazy &&rhs) noexcept : _handle(std::exchange(rhs._handle, nullptr)) {} - auto operator co_await() { - struct awaiter { - Handle handle; - bool await_ready() const noexcept { - return handle.done(); - } - Handle await_suspend(std::coroutine_handle<> waiter) const noexcept { - handle.promise().waiter = waiter; - return handle; - } - T &await_resume() const noexcept { - return *handle.promise().value; - } - awaiter(Handle handle_in) : handle(handle_in) {} + auto operator co_await() & noexcept { + struct awaiter : awaiter_base { + using awaiter_base::handle; + awaiter(Handle handle_in) noexcept : awaiter_base(handle_in) {} + decltype(auto) await_resume() const { return handle.promise().result(); } + }; + return awaiter(_handle); + } + auto operator co_await() && noexcept { + struct awaiter : awaiter_base { + using awaiter_base::handle; + awaiter(Handle handle_in) noexcept : awaiter_base(handle_in) {} + decltype(auto) await_resume() const { return std::move(handle.promise()).result(); } }; return awaiter(_handle); } diff --git a/vespalib/src/vespa/vespalib/coro/sync_wait.h b/vespalib/src/vespa/vespalib/coro/sync_wait.h index e6a8fdc43f6..bdea2dfc7f0 100644 --- a/vespalib/src/vespa/vespalib/coro/sync_wait.h +++ b/vespalib/src/vespa/vespalib/coro/sync_wait.h @@ -4,31 +4,56 @@ #include "detached.h" #include "lazy.h" -#include #include +#include +#include + namespace vespalib::coro { template Detached signal_when_done(Lazy &value, S &sink) { - sink(co_await value); + try { + sink(co_await value); + } catch (...) { + sink(std::current_exception()); + } } +/** + * Wait for a lazy value to be calculated (note that waiting for a + * value will also start calculating it). Make sure the thread waiting + * is not needed in the calculation of the value, or you will end up + * with a deadlock. + **/ template T &sync_wait(Lazy &value) { struct MySink { Gate gate; T *result; + std::exception_ptr exception; void operator()(T &result_in) { result = &result_in; gate.countDown(); } - MySink() : gate(), result(nullptr) {} + void operator()(std::exception_ptr exception_in) { + exception = exception_in; + gate.countDown(); + } + MySink() : gate(), result(nullptr), exception() {} }; MySink sink; signal_when_done(value, sink); sink.gate.await(); + if (sink.exception) { + std::rethrow_exception(sink.exception); + } return *sink.result; } +template +T &&sync_wait(Lazy &&value) { + return std::move(sync_wait(value)); +} + } -- cgit v1.2.3