diff options
-rw-r--r-- | vespalib/src/tests/coro/lazy/lazy_test.cpp | 63 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/coro/completion.h | 104 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/coro/lazy.h | 4 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/coro/received.h | 46 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/coro/schedule.h | 1 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/coro/sync_wait.h | 59 |
6 files changed, 209 insertions, 68 deletions
diff --git a/vespalib/src/tests/coro/lazy/lazy_test.cpp b/vespalib/src/tests/coro/lazy/lazy_test.cpp index ead5e8e6427..ec27bf195ec 100644 --- a/vespalib/src/tests/coro/lazy/lazy_test.cpp +++ b/vespalib/src/tests/coro/lazy/lazy_test.cpp @@ -1,10 +1,11 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/coro/lazy.h> -#include <vespa/vespalib/coro/sync_wait.h> +#include <vespa/vespalib/coro/completion.h> #include <vespa/vespalib/coro/schedule.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/require.h> +#include <vespa/vespalib/util/gate.h> #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/gtest/gtest.h> #include <mutex> @@ -12,7 +13,9 @@ #include <thread> using vespalib::Executor; +using vespalib::Gate; using vespalib::coro::Lazy; +using vespalib::coro::Received; using vespalib::coro::ScheduleFailedException; using vespalib::coro::schedule; using vespalib::coro::sync_wait; @@ -56,7 +59,7 @@ Lazy<std::pair<bool,T>> try_schedule_on(Executor &executor, Lazy<T> value) { std::cerr << "switching from thread " << std::this_thread::get_id() << std::endl; bool accepted = co_await try_schedule(executor); std::cerr << "........... to thread " << std::this_thread::get_id() << std::endl; - co_return std::make_pair(accepted, co_await value); + co_return std::make_pair(accepted, co_await std::move(value)); } template <typename T> @@ -64,18 +67,18 @@ Lazy<T> schedule_on(Executor &executor, Lazy<T> value) { std::cerr << "switching from thread " << std::this_thread::get_id() << std::endl; co_await schedule(executor); std::cerr << "........... to thread " << std::this_thread::get_id() << std::endl; - co_return co_await value; + co_return co_await std::move(value); } TEST(LazyTest, simple_lazy_value) { auto lazy = make_lazy(42); - auto result = sync_wait(lazy); + auto result = sync_wait(std::move(lazy)); EXPECT_EQ(result, 42); } TEST(LazyTest, async_sum_of_async_values) { auto lazy = async_add_values(10, 20); - auto result = sync_wait(lazy); + auto result = sync_wait(std::move(lazy)); EXPECT_EQ(result, 30); } @@ -83,13 +86,13 @@ TEST(LazyTest, async_sum_of_external_async_values) { auto a = make_lazy(100); auto b = make_lazy(200); auto lazy = async_sum(std::move(a), std::move(b)); - auto result = sync_wait(lazy); + auto result = sync_wait(std::move(lazy)); EXPECT_EQ(result, 300); } TEST(LazyTest, extract_rvalue_from_lazy_in_coroutine) { auto lazy = extract_rvalue(); - auto result = sync_wait(lazy); + auto result = sync_wait(std::move(lazy)); EXPECT_EQ(result, 123); } @@ -110,7 +113,7 @@ TEST(LazyTest, calculate_result_in_another_thread) { TEST(LazyTest, exceptions_are_propagated) { vespalib::ThreadStackExecutor executor(1, 128_Ki); auto lazy = try_schedule_on(executor, forward_value(will_throw())); - EXPECT_THROW(sync_wait(lazy), vespalib::RequireFailedException); + EXPECT_THROW(sync_wait(std::move(lazy)), vespalib::RequireFailedException); } TEST(LazyTest, not_able_to_switch_thread_if_executor_is_shut_down) { @@ -120,7 +123,49 @@ TEST(LazyTest, not_able_to_switch_thread_if_executor_is_shut_down) { EXPECT_EQ(result.first, false); EXPECT_EQ(result.second, 7); auto lazy = schedule_on(executor, make_lazy(8)); - EXPECT_THROW(sync_wait(lazy), ScheduleFailedException); + EXPECT_THROW(sync_wait(std::move(lazy)), ScheduleFailedException); +} + +TEST(LazyTest, async_wait_with_lambda) { + Gate gate; + Received<int> result; + vespalib::ThreadStackExecutor executor(1, 128_Ki); + auto lazy = schedule_on(executor, make_lazy(7)); + async_wait(std::move(lazy), [&](auto res) + { + result = res; + gate.countDown(); + }); + gate.await(); + EXPECT_EQ(result.get_value(), 7); +} + +TEST(LazyTest, async_wait_with_error) { + Gate gate; + Received<int> result; + vespalib::ThreadStackExecutor executor(1, 128_Ki); + auto lazy = schedule_on(executor, will_throw()); + async_wait(std::move(lazy), [&](auto res) + { + result = res; + gate.countDown(); + }); + gate.await(); + EXPECT_THROW(result.get_value(), vespalib::RequireFailedException); +} + +TEST(LazyTest, async_wait_with_move_only_result) { + Gate gate; + Received<std::unique_ptr<int>> result; + vespalib::ThreadStackExecutor executor(1, 128_Ki); + auto lazy = schedule_on(executor, move_only_int()); + async_wait(std::move(lazy), [&](auto res) + { + result = std::move(res); + gate.countDown(); + }); + gate.await(); + EXPECT_EQ(*(result.get_value()), 123); } GTEST_MAIN_RUN_ALL_TESTS() diff --git a/vespalib/src/vespa/vespalib/coro/completion.h b/vespalib/src/vespa/vespalib/coro/completion.h new file mode 100644 index 00000000000..6533a718010 --- /dev/null +++ b/vespalib/src/vespa/vespalib/coro/completion.h @@ -0,0 +1,104 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "lazy.h" +#include "detached.h" +#include "received.h" + +#include <coroutine> +#include <exception> +#include <future> +#include <type_traits> + +namespace vespalib::coro { + +// Resume/start the coroutine responsible for calculating the result +// and signal the receiver when it completes or fails. Note that the +// detached coroutine will own both the coroutine calculating the +// result and the receiver that is later notified of the result. The +// detached coroutine will automatically self-destroy when it returns, +// thereby also destroying the value and receiver. This is the +// fundamental building block used to adapt the asynchronous result of +// a coroutine with external code. This also closely models abstract +// execution where the coroutine represented by Lazy<T> is the +// sender. Execution parameters can be encapsulated inside Lazy<T> +// using composition (for example which executor should run the +// coroutine). + +template <typename T, typename R> +Detached connect_resume(Lazy<T> value, R receiver) { + try { + receiver.set_value(co_await std::move(value)); + } catch (...) { + receiver.set_error(std::current_exception()); + } +} + +// replace Lazy<T> with std::future<T> to be able to synchronously +// wait for its completion. Implemented in terms of connect_resume. + +template <typename T> +std::future<T> make_future(Lazy<T> value) { + struct receiver { + std::promise<T> promise; + receiver() : promise() {} + void set_value(T value) { + promise.set_value(std::move(value)); + } + void set_error(std::exception_ptr error) { + promise.set_exception(error); + } + }; + receiver my_receiver; + auto future = my_receiver.promise.get_future(); + connect_resume(std::move(value), std::move(my_receiver)); + return future; +} + +// Create a receiver from a function object (typically a lambda +// closure) that takes a received value (stored receiver result) as +// its only parameter. + +template <typename T, typename F> +auto make_receiver(F &&f) { + struct receiver { + Received<T> result; + std::decay_t<F> fun; + receiver(F &&f) + : result(), fun(std::forward<F>(f)) {} + void set_value(T value) { + result.set_value(std::move(value)); + fun(std::move(result)); + } + void set_error(std::exception_ptr why) { + result.set_error(why); + fun(std::move(result)); + } + }; + return receiver(std::forward<F>(f)); +} + +/** + * Wait for a lazy value to be calculated synchronously. Make sure the + * thread waiting is not needed in the calculation of the value, or + * you will end up with a deadlock. + **/ +template <typename T> +T sync_wait(Lazy<T> value) { + return std::move(make_future(std::move(value)).get()); +} + +/** + * Wait for a lazy value to be calculated asynchronously; the provided + * callback will be called with a Received<T> when the Lazy<T> is + * done. Both the callback itself and the Lazy<T> will be destructed + * afterwards; cleaning up the coroutine tree representing the + * calculation. + **/ +template <typename T, typename F> +void async_wait(Lazy<T> value, F &&f) { + connect_resume(std::move(value), make_receiver<T>(std::forward<F>(f))); +} + +} diff --git a/vespalib/src/vespa/vespalib/coro/lazy.h b/vespalib/src/vespa/vespalib/coro/lazy.h index 5a10c05bc24..144b5c945f0 100644 --- a/vespalib/src/vespa/vespalib/coro/lazy.h +++ b/vespalib/src/vespa/vespalib/coro/lazy.h @@ -64,6 +64,7 @@ public: } return std::move(*value); } + ~promise_type(); }; using Handle = std::coroutine_handle<promise_type>; @@ -108,4 +109,7 @@ public: } }; +template<typename T> +Lazy<T>::promise_type::~promise_type() = default; + } diff --git a/vespalib/src/vespa/vespalib/coro/received.h b/vespalib/src/vespa/vespalib/coro/received.h new file mode 100644 index 00000000000..4f2efddcfa1 --- /dev/null +++ b/vespalib/src/vespa/vespalib/coro/received.h @@ -0,0 +1,46 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <variant> +#include <exception> +#include <stdexcept> + +namespace vespalib::coro { + +struct UnavailableResultException : std::runtime_error { + using std::runtime_error::runtime_error; +}; + +/** + * Simple value wrapper that stores the result observed by a receiver + * (value/error/done). A receiver is the continuation of an + * asynchronous operation in the world of executors. + **/ +template <std::movable T> +class Received { +private: + std::variant<std::exception_ptr,T> _value; +public: + Received() : _value() {} + void set_value(T value) { _value.template emplace<1>(std::move(value)); } + void set_error(std::exception_ptr exception) { _value.template emplace<0>(exception); } + void set_done() { _value.template emplace<0>(nullptr); } + bool has_value() const { return (_value.index() == 1); } + bool has_error() const { return (_value.index() == 0) && bool(std::get<0>(_value)); } + bool was_canceled() const { return !has_value() && !has_error(); } + std::exception_ptr get_error() const { return has_error() ? std::get<0>(_value) : std::exception_ptr(); } + T get_value() { + if (_value.index() == 1) { + return std::move(std::get<1>(_value)); + } else { + if (auto ex = std::get<0>(_value)) { + std::rethrow_exception(ex); + } else { + throw UnavailableResultException("tried to access the result of a canceled operation"); + } + } + } +}; + +} diff --git a/vespalib/src/vespa/vespalib/coro/schedule.h b/vespalib/src/vespa/vespalib/coro/schedule.h index 6dfa5b9536c..71a384f356f 100644 --- a/vespalib/src/vespa/vespalib/coro/schedule.h +++ b/vespalib/src/vespa/vespalib/coro/schedule.h @@ -5,6 +5,7 @@ #include <vespa/vespalib/util/executor.h> #include <coroutine> #include <exception> +#include <stdexcept> namespace vespalib::coro { diff --git a/vespalib/src/vespa/vespalib/coro/sync_wait.h b/vespalib/src/vespa/vespalib/coro/sync_wait.h deleted file mode 100644 index bdea2dfc7f0..00000000000 --- a/vespalib/src/vespa/vespalib/coro/sync_wait.h +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "detached.h" -#include "lazy.h" -#include <vespa/vespalib/util/gate.h> - -#include <coroutine> -#include <exception> - -namespace vespalib::coro { - -template <typename T, typename S> -Detached signal_when_done(Lazy<T> &value, S &sink) { - try { - sink(co_await value); - } catch (...) { - sink(std::current_exception()); - } -} - -/** - * Wait for a lazy value to be calculated (note that waiting for a - * value will also start calculating it). Make sure the thread waiting - * is not needed in the calculation of the value, or you will end up - * with a deadlock. - **/ -template <typename T> -T &sync_wait(Lazy<T> &value) { - struct MySink { - Gate gate; - T *result; - std::exception_ptr exception; - void operator()(T &result_in) { - result = &result_in; - gate.countDown(); - } - void operator()(std::exception_ptr exception_in) { - exception = exception_in; - gate.countDown(); - } - MySink() : gate(), result(nullptr), exception() {} - }; - MySink sink; - signal_when_done(value, sink); - sink.gate.await(); - if (sink.exception) { - std::rethrow_exception(sink.exception); - } - return *sink.result; -} - -template <typename T> -T &&sync_wait(Lazy<T> &&value) { - return std::move(sync_wait(value)); -} - -} |