summaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2022-10-12 09:40:03 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2022-10-12 15:24:29 +0000
commit8568274e54d065ae904907537dcf4ce6a9581bfa (patch)
treebf3ad64563670aca5bfc98c88171fe9bc7443280 /vespalib
parent0d6d4159ba109f78fd9fea0c6b5e8d940ffc7e90 (diff)
more coroutines
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/tests/coro/lazy/lazy_test.cpp74
-rw-r--r--vespalib/src/vespa/vespalib/coro/detached.h12
-rw-r--r--vespalib/src/vespa/vespalib/coro/lazy.h79
-rw-r--r--vespalib/src/vespa/vespalib/coro/sync_wait.h31
4 files changed, 173 insertions, 23 deletions
diff --git a/vespalib/src/tests/coro/lazy/lazy_test.cpp b/vespalib/src/tests/coro/lazy/lazy_test.cpp
index a715e473aaf..4ba1c950ad0 100644
--- a/vespalib/src/tests/coro/lazy/lazy_test.cpp
+++ b/vespalib/src/tests/coro/lazy/lazy_test.cpp
@@ -2,11 +2,35 @@
#include <vespa/vespalib/coro/lazy.h>
#include <vespa/vespalib/coro/sync_wait.h>
+#include <vespa/vespalib/util/require.h>
#include <vespa/vespalib/gtest/gtest.h>
+#include <thread>
+
using vespalib::coro::Lazy;
using vespalib::coro::sync_wait;
+std::vector<std::thread> threads;
+struct JoinThreads {
+ ~JoinThreads() {
+ for (auto &thread: threads) {
+ thread.join();
+ }
+ threads.clear();
+ }
+};
+
+auto run_in_other_thread() {
+ struct awaiter {
+ bool await_ready() const noexcept { return false; }
+ void await_suspend(std::coroutine_handle<> handle) const {
+ threads.push_back(std::thread(handle));
+ }
+ void await_resume() const noexcept {}
+ };
+ return awaiter();
+}
+
Lazy<int> make_lazy(int value) {
co_return value;
}
@@ -21,6 +45,33 @@ Lazy<int> async_sum(Lazy<int> a, Lazy<int> b) {
co_return (co_await a + co_await b);
}
+Lazy<std::unique_ptr<int>> move_only_int() {
+ co_return std::make_unique<int>(123);
+}
+
+Lazy<int> extract_rvalue() {
+ auto res = co_await move_only_int();
+ co_return *res;
+}
+
+Lazy<int> will_throw() {
+ REQUIRE_FAILED("failed on purpose");
+ co_return 123;
+}
+
+template<typename T>
+Lazy<T> forward_value(Lazy<T> value) {
+ co_return co_await std::move(value);
+}
+
+template <typename T>
+Lazy<T> switch_thread(Lazy<T> value) {
+ std::cerr << "switching from thread " << std::this_thread::get_id() << std::endl;
+ co_await run_in_other_thread();
+ std::cerr << "........... to thread " << std::this_thread::get_id() << std::endl;
+ co_return co_await value;
+}
+
TEST(LazyTest, simple_lazy_value) {
auto lazy = make_lazy(42);
auto result = sync_wait(lazy);
@@ -41,4 +92,27 @@ TEST(LazyTest, async_sum_of_external_async_values) {
EXPECT_EQ(result, 300);
}
+TEST(LazyTest, extract_rvalue_from_lazy_in_coroutine) {
+ auto lazy = extract_rvalue();
+ auto result = sync_wait(lazy);
+ EXPECT_EQ(result, 123);
+}
+
+TEST(LazyTest, extract_rvalue_from_lazy_in_sync_wait) {
+ auto result = sync_wait(move_only_int());
+ EXPECT_EQ(*result, 123);
+}
+
+TEST(LazyTest, calculate_result_in_another_thread) {
+ JoinThreads thread_guard;
+ auto result = sync_wait(switch_thread(make_lazy(7)));
+ EXPECT_EQ(result, 7);
+}
+
+TEST(LazyTest, exceptions_are_propagated) {
+ JoinThreads thread_guard;
+ auto lazy = switch_thread(forward_value(will_throw()));
+ EXPECT_THROW(sync_wait(lazy), vespalib::RequireFailedException);
+}
+
GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/vespalib/src/vespa/vespalib/coro/detached.h b/vespalib/src/vespa/vespalib/coro/detached.h
index 6d051e53121..5e3fa1452fa 100644
--- a/vespalib/src/vespa/vespalib/coro/detached.h
+++ b/vespalib/src/vespa/vespalib/coro/detached.h
@@ -7,6 +7,18 @@
namespace vespalib::coro {
+/**
+ * coroutine return type
+ *
+ * The coroutine is eager (will not suspend in initial_suspend) and
+ * self destroying (will not suspend in final_suspend). The return
+ * value gives no way of interacting with the coroutine. Without any
+ * co_await operations this acts similar to a normal subroutine. Note
+ * that letting a detached coroutine wait for a Lazy<T> will
+ * essentially attach it to the Lazy<T> as a continuation and resume
+ * it, but will require the Lazy<T> not to be deleted mid flight
+ * (started but not completed).
+ **/
struct Detached {
struct promise_type {
Detached get_return_object() { return {}; }
diff --git a/vespalib/src/vespa/vespalib/coro/lazy.h b/vespalib/src/vespa/vespalib/coro/lazy.h
index b007f565c93..ddfaa07528b 100644
--- a/vespalib/src/vespa/vespalib/coro/lazy.h
+++ b/vespalib/src/vespa/vespalib/coro/lazy.h
@@ -5,9 +5,22 @@
#include <concepts>
#include <coroutine>
#include <optional>
+#include <exception>
namespace vespalib::coro {
+/**
+ * coroutine return type
+ *
+ * The coroutine is lazy (will suspend in initial_suspend) and
+ * destroyed form the outside (will suspend in final_suspend). Waiting
+ * for a Lazy<T> using co_await will use symmetric transfer to suspend
+ * the waiting coroutine and resuming this one. The waiting coroutine
+ * is registered as a continuation and will be resumed again once the
+ * result is avalable (also using symmetric transfer). The result is
+ * assumed to be produced asynchronously. If you need to access it
+ * from the outside (in that specific thread); use sync_wait.
+ **/
template <std::movable T>
class [[nodiscard]] Lazy {
public:
@@ -18,47 +31,73 @@ public:
struct awaiter {
bool await_ready() const noexcept { return false; }
std::coroutine_handle<> await_suspend(Handle handle) const noexcept {
- auto waiter = handle.promise().waiter;
- return waiter ? waiter : std::noop_coroutine();
+ return handle.promise().waiter;
}
void await_resume() const noexcept {}
};
return awaiter();
}
- void return_value(T ret_value) noexcept {
+ void return_value(const T &ret_value) noexcept(std::is_nothrow_move_constructible_v<T>) requires(std::copy_constructible<T>) {
+ value = ret_value;
+ }
+ void return_value(T &&ret_value) noexcept(std::is_nothrow_move_constructible_v<T>) {
value = std::move(ret_value);
}
- static void unhandled_exception() { std::terminate(); }
+ void unhandled_exception() noexcept {
+ exception = std::current_exception();
+ }
std::optional<T> value;
+ std::exception_ptr exception;
std::coroutine_handle<> waiter;
promise_type(promise_type &&) = delete;
promise_type(const promise_type &) = delete;
- promise_type() : value(std::nullopt), waiter(nullptr) {}
+ promise_type() : value(std::nullopt), exception(), waiter(std::noop_coroutine()) {}
+ T &result() & {
+ if (exception) {
+ std::rethrow_exception(exception);
+ }
+ return *value;
+ }
+ T &&result() && {
+ if (exception) {
+ std::rethrow_exception(exception);
+ }
+ return std::move(*value);
+ }
};
using Handle = std::coroutine_handle<promise_type>;
private:
Handle _handle;
-
+
+ struct awaiter_base {
+ Handle handle;
+ awaiter_base(Handle handle_in) noexcept : handle(handle_in) {}
+ bool await_ready() const noexcept { return handle.done(); }
+ Handle await_suspend(std::coroutine_handle<> waiter) const noexcept {
+ handle.promise().waiter = waiter;
+ return handle;
+ }
+ };
+
public:
Lazy(const Lazy &) = delete;
Lazy &operator=(const Lazy &) = delete;
explicit Lazy(Handle handle_in) noexcept : _handle(handle_in) {}
Lazy(Lazy &&rhs) noexcept : _handle(std::exchange(rhs._handle, nullptr)) {}
- auto operator co_await() {
- struct awaiter {
- Handle handle;
- bool await_ready() const noexcept {
- return handle.done();
- }
- Handle await_suspend(std::coroutine_handle<> waiter) const noexcept {
- handle.promise().waiter = waiter;
- return handle;
- }
- T &await_resume() const noexcept {
- return *handle.promise().value;
- }
- awaiter(Handle handle_in) : handle(handle_in) {}
+ auto operator co_await() & noexcept {
+ struct awaiter : awaiter_base {
+ using awaiter_base::handle;
+ awaiter(Handle handle_in) noexcept : awaiter_base(handle_in) {}
+ decltype(auto) await_resume() const { return handle.promise().result(); }
+ };
+ return awaiter(_handle);
+ }
+ auto operator co_await() && noexcept {
+ struct awaiter : awaiter_base {
+ using awaiter_base::handle;
+ awaiter(Handle handle_in) noexcept : awaiter_base(handle_in) {}
+ decltype(auto) await_resume() const { return std::move(handle.promise()).result(); }
};
return awaiter(_handle);
}
diff --git a/vespalib/src/vespa/vespalib/coro/sync_wait.h b/vespalib/src/vespa/vespalib/coro/sync_wait.h
index e6a8fdc43f6..bdea2dfc7f0 100644
--- a/vespalib/src/vespa/vespalib/coro/sync_wait.h
+++ b/vespalib/src/vespa/vespalib/coro/sync_wait.h
@@ -4,31 +4,56 @@
#include "detached.h"
#include "lazy.h"
-#include <coroutine>
#include <vespa/vespalib/util/gate.h>
+#include <coroutine>
+#include <exception>
+
namespace vespalib::coro {
template <typename T, typename S>
Detached signal_when_done(Lazy<T> &value, S &sink) {
- sink(co_await value);
+ try {
+ sink(co_await value);
+ } catch (...) {
+ sink(std::current_exception());
+ }
}
+/**
+ * Wait for a lazy value to be calculated (note that waiting for a
+ * value will also start calculating it). Make sure the thread waiting
+ * is not needed in the calculation of the value, or you will end up
+ * with a deadlock.
+ **/
template <typename T>
T &sync_wait(Lazy<T> &value) {
struct MySink {
Gate gate;
T *result;
+ std::exception_ptr exception;
void operator()(T &result_in) {
result = &result_in;
gate.countDown();
}
- MySink() : gate(), result(nullptr) {}
+ void operator()(std::exception_ptr exception_in) {
+ exception = exception_in;
+ gate.countDown();
+ }
+ MySink() : gate(), result(nullptr), exception() {}
};
MySink sink;
signal_when_done(value, sink);
sink.gate.await();
+ if (sink.exception) {
+ std::rethrow_exception(sink.exception);
+ }
return *sink.result;
}
+template <typename T>
+T &&sync_wait(Lazy<T> &&value) {
+ return std::move(sync_wait(value));
+}
+
}