summaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2022-11-02 11:29:55 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2022-11-02 11:29:55 +0000
commit1737c2d97b98eb2079eba7003043ffafe77edbc6 (patch)
treec649382f9dadb8359579096647dbbdd17dddce17 /vespalib
parent88463ea1b79873a63bf0aa6aa2813e7abfa85659 (diff)
enable scheduling a coroutine on an executor
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/tests/coro/lazy/lazy_test.cpp65
-rw-r--r--vespalib/src/vespa/vespalib/coro/schedule.h76
2 files changed, 111 insertions, 30 deletions
diff --git a/vespalib/src/tests/coro/lazy/lazy_test.cpp b/vespalib/src/tests/coro/lazy/lazy_test.cpp
index b838152249e..ead5e8e6427 100644
--- a/vespalib/src/tests/coro/lazy/lazy_test.cpp
+++ b/vespalib/src/tests/coro/lazy/lazy_test.cpp
@@ -2,37 +2,21 @@
#include <vespa/vespalib/coro/lazy.h>
#include <vespa/vespalib/coro/sync_wait.h>
+#include <vespa/vespalib/coro/schedule.h>
+#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/require.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <mutex>
#include <thread>
+using vespalib::Executor;
using vespalib::coro::Lazy;
+using vespalib::coro::ScheduleFailedException;
+using vespalib::coro::schedule;
using vespalib::coro::sync_wait;
-
-std::mutex thread_lock;
-std::vector<std::thread> threads;
-struct JoinThreads {
- ~JoinThreads() {
- for (auto &thread: threads) {
- thread.join();
- }
- threads.clear();
- }
-};
-
-auto run_in_other_thread() {
- struct awaiter {
- bool await_ready() const noexcept { return false; }
- void await_suspend(std::coroutine_handle<> handle) const {
- auto guard = std::lock_guard(thread_lock);
- threads.push_back(std::thread(handle));
- }
- void await_resume() const noexcept {}
- };
- return awaiter();
-}
+using vespalib::coro::try_schedule;
Lazy<int> make_lazy(int value) {
co_return value;
@@ -68,9 +52,17 @@ Lazy<T> forward_value(Lazy<T> value) {
}
template <typename T>
-Lazy<T> switch_thread(Lazy<T> value) {
+Lazy<std::pair<bool,T>> try_schedule_on(Executor &executor, Lazy<T> value) {
std::cerr << "switching from thread " << std::this_thread::get_id() << std::endl;
- co_await run_in_other_thread();
+ bool accepted = co_await try_schedule(executor);
+ std::cerr << "........... to thread " << std::this_thread::get_id() << std::endl;
+ co_return std::make_pair(accepted, co_await value);
+}
+
+template <typename T>
+Lazy<T> schedule_on(Executor &executor, Lazy<T> value) {
+ std::cerr << "switching from thread " << std::this_thread::get_id() << std::endl;
+ co_await schedule(executor);
std::cerr << "........... to thread " << std::this_thread::get_id() << std::endl;
co_return co_await value;
}
@@ -107,15 +99,28 @@ TEST(LazyTest, extract_rvalue_from_lazy_in_sync_wait) {
}
TEST(LazyTest, calculate_result_in_another_thread) {
- JoinThreads thread_guard;
- auto result = sync_wait(switch_thread(make_lazy(7)));
- EXPECT_EQ(result, 7);
+ vespalib::ThreadStackExecutor executor(1, 128_Ki);
+ auto result = sync_wait(try_schedule_on(executor, make_lazy(7)));
+ EXPECT_EQ(result.first, true);
+ EXPECT_EQ(result.second, 7);
+ auto result2 = sync_wait(schedule_on(executor, make_lazy(8)));
+ EXPECT_EQ(result2, 8);
}
TEST(LazyTest, exceptions_are_propagated) {
- JoinThreads thread_guard;
- auto lazy = switch_thread(forward_value(will_throw()));
+ vespalib::ThreadStackExecutor executor(1, 128_Ki);
+ auto lazy = try_schedule_on(executor, forward_value(will_throw()));
EXPECT_THROW(sync_wait(lazy), vespalib::RequireFailedException);
}
+TEST(LazyTest, not_able_to_switch_thread_if_executor_is_shut_down) {
+ vespalib::ThreadStackExecutor executor(1, 128_Ki);
+ executor.shutdown();
+ auto result = sync_wait(try_schedule_on(executor, make_lazy(7)));
+ EXPECT_EQ(result.first, false);
+ EXPECT_EQ(result.second, 7);
+ auto lazy = schedule_on(executor, make_lazy(8));
+ EXPECT_THROW(sync_wait(lazy), ScheduleFailedException);
+}
+
GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/vespalib/src/vespa/vespalib/coro/schedule.h b/vespalib/src/vespa/vespalib/coro/schedule.h
new file mode 100644
index 00000000000..6dfa5b9536c
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/coro/schedule.h
@@ -0,0 +1,76 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/util/executor.h>
+#include <coroutine>
+#include <exception>
+
+namespace vespalib::coro {
+
+struct ScheduleFailedException : std::runtime_error {
+ using std::runtime_error::runtime_error;
+};
+
+// Schedule the current coroutine on the given executor. Throws an
+// exception if the request was rejected by the executor.
+
+auto schedule(Executor &executor) {
+ struct [[nodiscard]] awaiter {
+ Executor &executor;
+ awaiter(Executor &executor_in)
+ : executor(executor_in) {}
+ bool await_ready() const noexcept { return false; }
+ void await_suspend(std::coroutine_handle<> handle) {
+ struct ResumeTask : Executor::Task {
+ std::coroutine_handle<> handle;
+ ResumeTask(std::coroutine_handle<> handle_in)
+ : handle(handle_in) {}
+ void run() override { handle.resume(); }
+ };
+ Executor::Task::UP task = std::make_unique<ResumeTask>(handle);
+ task = executor.execute(std::move(task));
+ if (task) {
+ throw ScheduleFailedException("rejected by executor");
+ }
+ }
+ void await_resume() const noexcept {}
+ };
+ return awaiter(executor);
+}
+
+// Try to schedule the current coroutine on the given executor. If the
+// awaiter returns true, the coroutine is now run by the executor. If
+// the awaiter returns false, the request was rejected by the executor
+// and the coroutine is still running in our original context.
+
+auto try_schedule(Executor &executor) {
+ struct [[nodiscard]] awaiter {
+ Executor &executor;
+ bool accepted;
+ awaiter(Executor &executor_in)
+ : executor(executor_in), accepted(true) {}
+ bool await_ready() const noexcept { return false; }
+ bool await_suspend(std::coroutine_handle<> handle) {
+ struct ResumeTask : Executor::Task {
+ std::coroutine_handle<> handle;
+ ResumeTask(std::coroutine_handle<> handle_in)
+ : handle(handle_in) {}
+ void run() override { handle.resume(); }
+ };
+ Executor::Task::UP task = std::make_unique<ResumeTask>(handle);
+ task = executor.execute(std::move(task));
+ if (task) {
+ // need to start with accepted == true to avoid race
+ // with handle.resume() from executor thread before
+ // await_suspend has returned.
+ accepted = false;
+ }
+ return accepted;
+ }
+ [[nodiscard]] bool await_resume() const noexcept { return accepted; }
+ };
+ return awaiter(executor);
+}
+
+}