// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once #include "received.h" #include #include namespace vespalib::coro { // State representing that someone (waiter) is waiting for something // (result). This object cannot be moved or copied. template struct PromiseState { Received result; std::coroutine_handle<> waiter; PromiseState(const PromiseState &) = delete; PromiseState &operator=(const PromiseState &) = delete; PromiseState(PromiseState &&) = delete; PromiseState &operator=(PromiseState &&) = delete; PromiseState() noexcept : result(), waiter(std::noop_coroutine()) {} ~PromiseState(); }; template PromiseState::~PromiseState() = default; // A thin (smart) wrapper referencing a PromiseState representing // that a coroutine is waiting for a value. This class acts as a // receiver in order to set the result value. When the owning // reference is deleted, the waiting coroutine will be resumed. template class WaitingFor { private: PromiseState *_state; WaitingFor(PromiseState *state) noexcept : _state(state) {} public: WaitingFor() noexcept : _state(nullptr) {} WaitingFor(WaitingFor &&rhs) noexcept : _state(std::exchange(rhs._state, nullptr)) {} WaitingFor &operator=(WaitingFor &&rhs) { if (_state) { _state->result.set_done(); // canceled _state->waiter.resume(); } _state = std::exchange(rhs._state, nullptr); return *this; } WaitingFor(const WaitingFor &rhs) = delete; WaitingFor &operator=(const WaitingFor &rhs) = delete; ~WaitingFor(); operator bool() const noexcept { return _state; } template void set_value(RET &&value) { _state->result.set_value(std::forward(value)); } void set_error(std::exception_ptr exception) { _state->result.set_error(exception); } void set_done() { _state->result.set_done(); } void *release() { return std::exchange(_state, nullptr); } static WaitingFor from_pointer(void *ptr) { PromiseState *state = reinterpret_cast*>(ptr); return {state}; } static WaitingFor from_state(PromiseState &state) { return {&state}; } // Unasking the question. This will drop the internal reference to // the promise state and return the handle for the waiting // coroutine. A function responsible for starting an async // operation may chose to do 'wf.set_value()' followed by // 'return wf.mu()' to convert the async operation to a sync // operation by immediately resuming the waiting coroutine (by // symmetrically transferring control to itself). [[nodiscard]] std::coroutine_handle<> mu() { auto handle = std::exchange(_state->waiter, std::noop_coroutine()); _state = nullptr; return handle; } // If some branch in the async start function wants to return mu, // other branches can return nop. This is to help the compiler // figure out the return type of lambdas, since // std::noop_coroutine() is a distinct type. [[nodiscard]] static std::coroutine_handle<> nop() noexcept { return std::noop_coroutine(); } }; template WaitingFor::~WaitingFor() { if (_state) { _state->waiter.resume(); } } static_assert(receiver_of, int>); static_assert(receiver_of>, std::unique_ptr>); // concept indicating that F is a function that starts an async // operation with T as result. The result will eventually be delivered // to the WaitingFor passed to the function. This function has // optional support for symmetric transfer to switch to another // coroutine as a side-effect of starting the async operation. This // also enables the function to change the operation form async to // sync by setting the value directly in the function and returning // wf.mu() template concept start_async_op = requires(F &&f) { std::decay_t(std::forward(f)); } && (requires(std::decay_t cpy, WaitingFor wf) { { cpy(std::move(wf)) } -> std::same_as; } || requires(std::decay_t cpy, WaitingFor wf) { { cpy(std::move(wf)) } -> std::same_as>; }); // Create a custom awaiter that will return a value of type T when the // coroutine is resumed. The operation waited upon is represented by // the function object used to start it. Note that await_ready will // always return false, since the coroutine needs to be suspended in // order to create the WaitingFor object needed. Also, the // WaitingFor api implies that the value will be set from the // outside and thus cannot be ready up-front. Also note that // await_resume must return T by value, since the awaiter containing // the result is a temporary object. template requires start_async_op auto wait_for(F &&on_suspend) { struct awaiter final : PromiseState { using PromiseState::result; using PromiseState::waiter; std::decay_t on_suspend; awaiter(F &&on_suspend_in) : PromiseState(), on_suspend(std::forward(on_suspend_in)) {} bool await_ready() const noexcept { return false; } T await_resume() { return std::move(result).get_value(); } decltype(auto) await_suspend(std::coroutine_handle<> handle) __attribute__((noinline)) { waiter = handle; return on_suspend(WaitingFor::from_state(*this)); } }; return awaiter(std::forward(on_suspend)); } }