summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHÃ¥vard Pettersen <3535158+havardpe@users.noreply.github.com>2022-11-07 16:44:01 +0100
committerGitHub <noreply@github.com>2022-11-07 16:44:01 +0100
commitd29cb7e64a30a93b4ab445c872449809cdde6bcd (patch)
tree287650ff600b6bbeda4c975897eef4b63780c60a
parent0f5a0bc7e1ac2b85c7bf7df2debade99a3c5f2b2 (diff)
parent8f8b70f57bf8b5174ee67a0404c14eecd5b73b4e (diff)
Merge pull request #24759 from vespa-engine/havardpe/wire-the-completion-of-a-coroutine-into-other-code
wait for the completion of a Lazy<T> in non-coroutine code
-rw-r--r--vespalib/src/tests/coro/lazy/lazy_test.cpp63
-rw-r--r--vespalib/src/vespa/vespalib/coro/completion.h104
-rw-r--r--vespalib/src/vespa/vespalib/coro/lazy.h4
-rw-r--r--vespalib/src/vespa/vespalib/coro/received.h46
-rw-r--r--vespalib/src/vespa/vespalib/coro/schedule.h1
-rw-r--r--vespalib/src/vespa/vespalib/coro/sync_wait.h59
6 files changed, 209 insertions, 68 deletions
diff --git a/vespalib/src/tests/coro/lazy/lazy_test.cpp b/vespalib/src/tests/coro/lazy/lazy_test.cpp
index ead5e8e6427..ec27bf195ec 100644
--- a/vespalib/src/tests/coro/lazy/lazy_test.cpp
+++ b/vespalib/src/tests/coro/lazy/lazy_test.cpp
@@ -1,10 +1,11 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/coro/lazy.h>
-#include <vespa/vespalib/coro/sync_wait.h>
+#include <vespa/vespalib/coro/completion.h>
#include <vespa/vespalib/coro/schedule.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/require.h>
+#include <vespa/vespalib/util/gate.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <mutex>
@@ -12,7 +13,9 @@
#include <thread>
using vespalib::Executor;
+using vespalib::Gate;
using vespalib::coro::Lazy;
+using vespalib::coro::Received;
using vespalib::coro::ScheduleFailedException;
using vespalib::coro::schedule;
using vespalib::coro::sync_wait;
@@ -56,7 +59,7 @@ Lazy<std::pair<bool,T>> try_schedule_on(Executor &executor, Lazy<T> value) {
std::cerr << "switching from thread " << std::this_thread::get_id() << std::endl;
bool accepted = co_await try_schedule(executor);
std::cerr << "........... to thread " << std::this_thread::get_id() << std::endl;
- co_return std::make_pair(accepted, co_await value);
+ co_return std::make_pair(accepted, co_await std::move(value));
}
template <typename T>
@@ -64,18 +67,18 @@ Lazy<T> schedule_on(Executor &executor, Lazy<T> value) {
std::cerr << "switching from thread " << std::this_thread::get_id() << std::endl;
co_await schedule(executor);
std::cerr << "........... to thread " << std::this_thread::get_id() << std::endl;
- co_return co_await value;
+ co_return co_await std::move(value);
}
TEST(LazyTest, simple_lazy_value) {
auto lazy = make_lazy(42);
- auto result = sync_wait(lazy);
+ auto result = sync_wait(std::move(lazy));
EXPECT_EQ(result, 42);
}
TEST(LazyTest, async_sum_of_async_values) {
auto lazy = async_add_values(10, 20);
- auto result = sync_wait(lazy);
+ auto result = sync_wait(std::move(lazy));
EXPECT_EQ(result, 30);
}
@@ -83,13 +86,13 @@ TEST(LazyTest, async_sum_of_external_async_values) {
auto a = make_lazy(100);
auto b = make_lazy(200);
auto lazy = async_sum(std::move(a), std::move(b));
- auto result = sync_wait(lazy);
+ auto result = sync_wait(std::move(lazy));
EXPECT_EQ(result, 300);
}
TEST(LazyTest, extract_rvalue_from_lazy_in_coroutine) {
auto lazy = extract_rvalue();
- auto result = sync_wait(lazy);
+ auto result = sync_wait(std::move(lazy));
EXPECT_EQ(result, 123);
}
@@ -110,7 +113,7 @@ TEST(LazyTest, calculate_result_in_another_thread) {
TEST(LazyTest, exceptions_are_propagated) {
vespalib::ThreadStackExecutor executor(1, 128_Ki);
auto lazy = try_schedule_on(executor, forward_value(will_throw()));
- EXPECT_THROW(sync_wait(lazy), vespalib::RequireFailedException);
+ EXPECT_THROW(sync_wait(std::move(lazy)), vespalib::RequireFailedException);
}
TEST(LazyTest, not_able_to_switch_thread_if_executor_is_shut_down) {
@@ -120,7 +123,49 @@ TEST(LazyTest, not_able_to_switch_thread_if_executor_is_shut_down) {
EXPECT_EQ(result.first, false);
EXPECT_EQ(result.second, 7);
auto lazy = schedule_on(executor, make_lazy(8));
- EXPECT_THROW(sync_wait(lazy), ScheduleFailedException);
+ EXPECT_THROW(sync_wait(std::move(lazy)), ScheduleFailedException);
+}
+
+TEST(LazyTest, async_wait_with_lambda) {
+ Gate gate;
+ Received<int> result;
+ vespalib::ThreadStackExecutor executor(1, 128_Ki);
+ auto lazy = schedule_on(executor, make_lazy(7));
+ async_wait(std::move(lazy), [&](auto res)
+ {
+ result = res;
+ gate.countDown();
+ });
+ gate.await();
+ EXPECT_EQ(result.get_value(), 7);
+}
+
+TEST(LazyTest, async_wait_with_error) {
+ Gate gate;
+ Received<int> result;
+ vespalib::ThreadStackExecutor executor(1, 128_Ki);
+ auto lazy = schedule_on(executor, will_throw());
+ async_wait(std::move(lazy), [&](auto res)
+ {
+ result = res;
+ gate.countDown();
+ });
+ gate.await();
+ EXPECT_THROW(result.get_value(), vespalib::RequireFailedException);
+}
+
+TEST(LazyTest, async_wait_with_move_only_result) {
+ Gate gate;
+ Received<std::unique_ptr<int>> result;
+ vespalib::ThreadStackExecutor executor(1, 128_Ki);
+ auto lazy = schedule_on(executor, move_only_int());
+ async_wait(std::move(lazy), [&](auto res)
+ {
+ result = std::move(res);
+ gate.countDown();
+ });
+ gate.await();
+ EXPECT_EQ(*(result.get_value()), 123);
}
GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/vespalib/src/vespa/vespalib/coro/completion.h b/vespalib/src/vespa/vespalib/coro/completion.h
new file mode 100644
index 00000000000..f323d8c68bf
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/coro/completion.h
@@ -0,0 +1,104 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "lazy.h"
+#include "detached.h"
+#include "received.h"
+
+#include <coroutine>
+#include <exception>
+#include <future>
+#include <type_traits>
+
+namespace vespalib::coro {
+
+// Resume/start the coroutine responsible for calculating the result
+// and signal the receiver when it completes or fails. Note that the
+// detached coroutine will own both the coroutine calculating the
+// result and the receiver that is later notified of the result. The
+// detached coroutine will automatically self-destroy when it returns,
+// thereby also destroying the value and receiver. This is the
+// fundamental building block used to adapt the asynchronous result of
+// a coroutine with external code. This also closely models abstract
+// execution where the coroutine represented by Lazy<T> is the
+// sender. Execution parameters can be encapsulated inside Lazy<T>
+// using composition (for example which executor should run the
+// coroutine).
+
+template <typename T, typename R>
+Detached connect_resume(Lazy<T> value, R receiver) {
+ try {
+ receiver.set_value(co_await std::move(value));
+ } catch (...) {
+ receiver.set_error(std::current_exception());
+ }
+}
+
+// replace Lazy<T> with std::future<T> to be able to synchronously
+// wait for its completion. Implemented in terms of connect_resume.
+
+template <typename T>
+std::future<T> make_future(Lazy<T> value) {
+ struct receiver {
+ std::promise<T> promise;
+ receiver() : promise() {}
+ void set_value(T value) {
+ promise.set_value(std::move(value));
+ }
+ void set_error(std::exception_ptr error) {
+ promise.set_exception(error);
+ }
+ };
+ receiver my_receiver;
+ auto future = my_receiver.promise.get_future();
+ connect_resume(std::move(value), std::move(my_receiver));
+ return future;
+}
+
+// Create a receiver from a function object (typically a lambda
+// closure) that takes a received value (stored receiver result) as
+// its only parameter.
+
+template <typename T, typename F>
+auto make_receiver(F &&f) {
+ struct receiver {
+ Received<T> result;
+ std::decay_t<F> fun;
+ receiver(F &&f)
+ : result(), fun(std::forward<F>(f)) {}
+ void set_value(T value) {
+ result.set_value(std::move(value));
+ fun(std::move(result));
+ }
+ void set_error(std::exception_ptr why) {
+ result.set_error(why);
+ fun(std::move(result));
+ }
+ };
+ return receiver(std::forward<F>(f));
+}
+
+/**
+ * Wait for a lazy value to be calculated synchronously. Make sure the
+ * thread waiting is not needed in the calculation of the value, or
+ * you will end up with a deadlock.
+ **/
+template <typename T>
+T sync_wait(Lazy<T> value) {
+ return make_future(std::move(value)).get();
+}
+
+/**
+ * Wait for a lazy value to be calculated asynchronously; the provided
+ * callback will be called with a Received<T> when the Lazy<T> is
+ * done. Both the callback itself and the Lazy<T> will be destructed
+ * afterwards; cleaning up the coroutine tree representing the
+ * calculation.
+ **/
+template <typename T, typename F>
+void async_wait(Lazy<T> value, F &&f) {
+ connect_resume(std::move(value), make_receiver<T>(std::forward<F>(f)));
+}
+
+}
diff --git a/vespalib/src/vespa/vespalib/coro/lazy.h b/vespalib/src/vespa/vespalib/coro/lazy.h
index 5a10c05bc24..144b5c945f0 100644
--- a/vespalib/src/vespa/vespalib/coro/lazy.h
+++ b/vespalib/src/vespa/vespalib/coro/lazy.h
@@ -64,6 +64,7 @@ public:
}
return std::move(*value);
}
+ ~promise_type();
};
using Handle = std::coroutine_handle<promise_type>;
@@ -108,4 +109,7 @@ public:
}
};
+template<typename T>
+Lazy<T>::promise_type::~promise_type() = default;
+
}
diff --git a/vespalib/src/vespa/vespalib/coro/received.h b/vespalib/src/vespa/vespalib/coro/received.h
new file mode 100644
index 00000000000..4f2efddcfa1
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/coro/received.h
@@ -0,0 +1,46 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <variant>
+#include <exception>
+#include <stdexcept>
+
+namespace vespalib::coro {
+
+struct UnavailableResultException : std::runtime_error {
+ using std::runtime_error::runtime_error;
+};
+
+/**
+ * Simple value wrapper that stores the result observed by a receiver
+ * (value/error/done). A receiver is the continuation of an
+ * asynchronous operation in the world of executors.
+ **/
+template <std::movable T>
+class Received {
+private:
+ std::variant<std::exception_ptr,T> _value;
+public:
+ Received() : _value() {}
+ void set_value(T value) { _value.template emplace<1>(std::move(value)); }
+ void set_error(std::exception_ptr exception) { _value.template emplace<0>(exception); }
+ void set_done() { _value.template emplace<0>(nullptr); }
+ bool has_value() const { return (_value.index() == 1); }
+ bool has_error() const { return (_value.index() == 0) && bool(std::get<0>(_value)); }
+ bool was_canceled() const { return !has_value() && !has_error(); }
+ std::exception_ptr get_error() const { return has_error() ? std::get<0>(_value) : std::exception_ptr(); }
+ T get_value() {
+ if (_value.index() == 1) {
+ return std::move(std::get<1>(_value));
+ } else {
+ if (auto ex = std::get<0>(_value)) {
+ std::rethrow_exception(ex);
+ } else {
+ throw UnavailableResultException("tried to access the result of a canceled operation");
+ }
+ }
+ }
+};
+
+}
diff --git a/vespalib/src/vespa/vespalib/coro/schedule.h b/vespalib/src/vespa/vespalib/coro/schedule.h
index 6dfa5b9536c..71a384f356f 100644
--- a/vespalib/src/vespa/vespalib/coro/schedule.h
+++ b/vespalib/src/vespa/vespalib/coro/schedule.h
@@ -5,6 +5,7 @@
#include <vespa/vespalib/util/executor.h>
#include <coroutine>
#include <exception>
+#include <stdexcept>
namespace vespalib::coro {
diff --git a/vespalib/src/vespa/vespalib/coro/sync_wait.h b/vespalib/src/vespa/vespalib/coro/sync_wait.h
deleted file mode 100644
index bdea2dfc7f0..00000000000
--- a/vespalib/src/vespa/vespalib/coro/sync_wait.h
+++ /dev/null
@@ -1,59 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include "detached.h"
-#include "lazy.h"
-#include <vespa/vespalib/util/gate.h>
-
-#include <coroutine>
-#include <exception>
-
-namespace vespalib::coro {
-
-template <typename T, typename S>
-Detached signal_when_done(Lazy<T> &value, S &sink) {
- try {
- sink(co_await value);
- } catch (...) {
- sink(std::current_exception());
- }
-}
-
-/**
- * Wait for a lazy value to be calculated (note that waiting for a
- * value will also start calculating it). Make sure the thread waiting
- * is not needed in the calculation of the value, or you will end up
- * with a deadlock.
- **/
-template <typename T>
-T &sync_wait(Lazy<T> &value) {
- struct MySink {
- Gate gate;
- T *result;
- std::exception_ptr exception;
- void operator()(T &result_in) {
- result = &result_in;
- gate.countDown();
- }
- void operator()(std::exception_ptr exception_in) {
- exception = exception_in;
- gate.countDown();
- }
- MySink() : gate(), result(nullptr), exception() {}
- };
- MySink sink;
- signal_when_done(value, sink);
- sink.gate.await();
- if (sink.exception) {
- std::rethrow_exception(sink.exception);
- }
- return *sink.result;
-}
-
-template <typename T>
-T &&sync_wait(Lazy<T> &&value) {
- return std::move(sync_wait(value));
-}
-
-}