diff options
author | Håvard Pettersen <havardpe@yahooinc.com> | 2022-12-21 12:30:34 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@yahooinc.com> | 2022-12-23 12:00:30 +0000 |
commit | 7d5ebdb57f0d79effe57ed26b54ac66ec62decac (patch) | |
tree | c86f38bd2f69c88275e16fd0dd97053c43eb5136 /vespalib | |
parent | eb9c62eebca80aec6b80b24d167407764c97a048 (diff) |
owner, shutdown and error propagation for async io
Diffstat (limited to 'vespalib')
-rw-r--r-- | vespalib/src/tests/coro/async_io/async_io_test.cpp | 31 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/coro/async_io.cpp | 324 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/coro/async_io.h | 36 |
3 files changed, 297 insertions, 94 deletions
diff --git a/vespalib/src/tests/coro/async_io/async_io_test.cpp b/vespalib/src/tests/coro/async_io/async_io_test.cpp index 9b8a98a8f69..a506a5dd0d4 100644 --- a/vespalib/src/tests/coro/async_io/async_io_test.cpp +++ b/vespalib/src/tests/coro/async_io/async_io_test.cpp @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/coro/lazy.h> +#include <vespa/vespalib/coro/detached.h> #include <vespa/vespalib/coro/completion.h> #include <vespa/vespalib/coro/async_io.h> #include <vespa/vespalib/net/socket_spec.h> @@ -12,6 +13,13 @@ using namespace vespalib; using namespace vespalib::coro; +Detached self_exiting_run_loop(AsyncIo::SP async) { + for (size_t i = 0; co_await async->schedule(); ++i) { + fprintf(stderr, "self_exiting_run_loop -> current value: %zu\n", i); + } + fprintf(stderr, "self_exiting_run_loop -> exiting\n"); +} + Work run_loop(AsyncIo &async, int a, int b) { for (int i = a; i < b; ++i) { co_await async.schedule(); @@ -22,20 +30,29 @@ Work run_loop(AsyncIo &async, int a, int b) { TEST(AsyncIoTest, create_async_io) { auto async = AsyncIo::create(); - ASSERT_TRUE(async); - fprintf(stderr, "async_io impl: %s\n", async->get_impl_spec().c_str()); + AsyncIo &api = async; + fprintf(stderr, "async_io impl: %s\n", api.get_impl_spec().c_str()); } TEST(AsyncIoTest, run_stuff_in_async_io_context) { auto async = AsyncIo::create(); - auto f1 = make_future(run_loop(*async, 10, 20)); - auto f2 = make_future(run_loop(*async, 20, 30)); - auto f3 = make_future(run_loop(*async, 30, 40)); + auto f1 = make_future(run_loop(async, 10, 20)); + auto f2 = make_future(run_loop(async, 20, 30)); + auto f3 = make_future(run_loop(async, 30, 40)); f1.wait(); f2.wait(); f3.wait(); } +TEST(AsyncIoTest, shutdown_with_self_exiting_coroutine) { + auto async = AsyncIo::create(); + auto f1 = make_future(run_loop(async, 10, 20)); + auto f2 = make_future(run_loop(async, 20, 30)); + self_exiting_run_loop(async.share()); + f1.wait(); + f2.wait(); +} + Lazy<size_t> write_msg(AsyncIo &async, SocketHandle &socket, const vespalib::string &msg) { size_t written = 0; while (written < msg.size()) { @@ -103,8 +120,8 @@ TEST(AsyncIoTest, raw_socket_io) { ServerSocket server_socket("tcp/0"); server_socket.set_blocking(false); auto async = AsyncIo::create(); - auto f1 = make_future(async_server(*async, server_socket)); - auto f2 = make_future(async_client(*async, server_socket)); + auto f1 = make_future(async_server(async, server_socket)); + auto f2 = make_future(async_client(async, server_socket)); f1.wait(); f2.wait(); } diff --git a/vespalib/src/vespa/vespalib/coro/async_io.cpp b/vespalib/src/vespa/vespalib/coro/async_io.cpp index 406debbe1f9..a38092a79fd 100644 --- a/vespalib/src/vespa/vespalib/coro/async_io.cpp +++ b/vespalib/src/vespa/vespalib/coro/async_io.cpp @@ -16,21 +16,17 @@ namespace { using Handle = std::coroutine_handle<>; -template <typename F> -struct await_void { - bool ready; - F on_suspend; - await_void(bool ready_in, F on_suspend_in) - : ready(ready_in), on_suspend(on_suspend_in) {} - bool await_ready() const noexcept { return ready; } - auto await_suspend(Handle handle) const { return on_suspend(handle); } - constexpr void await_resume() noexcept {} -}; -template <typename F> -await_void(bool ready_in, F on_suspend_in) -> await_void<F>; - struct SelectorThread : AsyncIo { + struct awaiter_base { + SelectorThread &self; + awaiter_base(SelectorThread &self_in) noexcept : self(self_in) {} + awaiter_base(const awaiter_base &) = delete; + awaiter_base &operator=(const awaiter_base &) = delete; + awaiter_base(awaiter_base &&) = delete; + awaiter_base &operator=(awaiter_base &&) = delete; + }; + struct FdContext { int _fd; bool _epoll_read; @@ -42,70 +38,105 @@ struct SelectorThread : AsyncIo { _epoll_read(false), _epoll_write(false), _reader(nullptr), _writer(nullptr) {} }; + using ThreadId = std::atomic<std::thread::id>; + std::map<int,FdContext> _state; - std::set<int> _check; - - Selector<FdContext> _selector; - bool _shutdown; - std::thread _thread; - bool _check_queue; - std::vector<Handle> _todo; - std::mutex _lock; - std::vector<Handle> _queue; + std::set<int> _check; + Selector<FdContext> _selector; + bool _shutdown; + std::thread _thread; + ThreadId _thread_id; + bool _check_queue; + std::vector<Handle> _todo; + std::mutex _lock; + std::vector<Handle> _queue; SelectorThread() : _state(), _check(), _selector(), _shutdown(false), - _thread(&SelectorThread::main, this), + _thread(), + _thread_id(std::thread::id()), _check_queue(false), _todo(), _lock(), - _queue() {} + _queue() + { + static_assert(ThreadId::is_always_lock_free); + } + void start() override; void main(); + void init_shutdown() override; + void fini_shutdown() override; ~SelectorThread(); - bool is_my_thread() const { return (std::this_thread::get_id() == _thread.get_id()); } + bool is_my_thread() const { + return (std::this_thread::get_id() == _thread_id.load(std::memory_order_relaxed)); + } auto protect() { return std::lock_guard(_lock); } auto queue_self_unless(bool ready) { - return await_void(ready, - [this](Handle handle) - { - bool need_wakeup = false; - { - auto guard = protect(); - need_wakeup = _queue.empty(); - _queue.push_back(handle); - } - if (need_wakeup) { - _selector.wakeup(); - } - }); + struct awaiter : awaiter_base { + bool ready; + awaiter(SelectorThread &self_in, bool ready_in) noexcept + : awaiter_base(self_in), ready(ready_in) {} + bool await_ready() const noexcept { return ready; } + bool await_resume() const noexcept { return self.is_my_thread(); } + bool await_suspend(Handle handle) __attribute__((noinline)) { + bool need_wakeup = false; + { + auto guard = self.protect(); + if (self._shutdown) { + return false; + } + need_wakeup = self._queue.empty(); + self._queue.push_back(handle); + } + if (need_wakeup) { + self._selector.wakeup(); + } + return true; + } + }; + return awaiter(*this, ready); } auto enter_thread() { return queue_self_unless(is_my_thread()); } auto readable(int fd) { + struct awaiter : awaiter_base { + int fd; + awaiter(SelectorThread &self_in, int fd_in) noexcept + : awaiter_base(self_in), fd(fd_in) {} + bool await_ready() const noexcept { return (fd < 0) || self._shutdown; } + void await_resume() const noexcept {} + void await_suspend(Handle handle) __attribute__((noinline)) { + auto [pos, ignore] = self._state.try_emplace(fd, fd); + FdContext &state = pos->second; + REQUIRE(!state._reader && "conflicting reads detected"); + state._reader = handle; + self._check.insert(state._fd); + } + }; + fprintf(stderr, "await readable(%d)\n", fd); REQUIRE(is_my_thread()); - return await_void((fd < 0), - [this, fd](Handle handle) - { - auto [pos, ignore] = _state.try_emplace(fd, fd); - FdContext &state = pos->second; - REQUIRE(!state._reader && "conflicting reads detected"); - state._reader = handle; - _check.insert(state._fd); - }); + return awaiter(*this, fd); } auto writable(int fd) { + struct awaiter : awaiter_base { + int fd; + awaiter(SelectorThread &self_in, int fd_in) noexcept + : awaiter_base(self_in), fd(fd_in) {} + bool await_ready() const noexcept { return (fd < 0) || self._shutdown; } + void await_resume() const noexcept {} + void await_suspend(Handle handle) __attribute__((noinline)) { + auto [pos, ignore] = self._state.try_emplace(fd, fd); + FdContext &state = pos->second; + REQUIRE(!state._writer && "conflicting reads detected"); + state._writer = handle; + self._check.insert(state._fd); + } + }; + fprintf(stderr, "await writable(%d)\n", fd); REQUIRE(is_my_thread()); - return await_void((fd < 0), - [this, fd](Handle handle) - { - auto [pos, ignore] = _state.try_emplace(fd, fd); - FdContext &state = pos->second; - REQUIRE(!state._writer && "conflicting write detected"); - state._writer = handle; - _check.insert(state._fd); - }); + return awaiter(*this, fd); } void update_epoll_state() { for (int fd: _check) { @@ -119,15 +150,20 @@ struct SelectorThread : AsyncIo { bool read_changed = ctx._epoll_read != bool(ctx._reader); bool write_changed = ctx._epoll_write != bool(ctx._writer); if (read_changed || write_changed) { + fprintf(stderr, "epoll update %d %s %s\n", ctx._fd, + ctx._reader ? "read" : "-", ctx._writer ? "write" : "-"); _selector.update(ctx._fd, ctx, bool(ctx._reader), bool(ctx._writer)); } } else { + fprintf(stderr, "epoll add %d %s %s\n", ctx._fd, + ctx._reader ? "read" : "-", ctx._writer ? "write" : "-"); _selector.add(ctx._fd, ctx, bool(ctx._reader), bool(ctx._writer)); } ctx._epoll_read = bool(ctx._reader); ctx._epoll_write = bool(ctx._writer); } else { if (was_added) { + fprintf(stderr, "epoll remove %d\n", ctx._fd); _selector.remove(ctx._fd); } _state.erase(pos); @@ -135,6 +171,28 @@ struct SelectorThread : AsyncIo { } _check.clear(); } + void cancel_epoll_state() { + REQUIRE(_shutdown); + _check.clear(); + for (auto &entry: _state) { + FdContext &ctx = entry.second; + const bool was_added = (ctx._epoll_read || ctx._epoll_write); + if (was_added) { + fprintf(stderr, "epoll remove %d (shutdown)\n", ctx._fd); + _selector.remove(ctx._fd); + } + if (ctx._reader) { + auto reader = std::exchange(ctx._reader, nullptr); + reader.resume(); + } + if (ctx._writer) { + auto writer = std::exchange(ctx._writer, nullptr); + writer.resume(); + } + } + _state.clear(); + REQUIRE(_check.empty()); + } void handle_wakeup() { _check_queue = true; } void handle_queue() { if (!_check_queue) { @@ -145,18 +203,26 @@ struct SelectorThread : AsyncIo { auto guard = protect(); std::swap(_todo, _queue); } + fprintf(stderr, "todo list: %zu items\n", _todo.size()); for (auto &&handle: _todo) { handle.resume(); } _todo.clear(); } + void force_handle_queue() { + REQUIRE(_shutdown); + _check_queue = true; + handle_queue(); + } void handle_event(FdContext &ctx, bool read, bool write) { _check.insert(ctx._fd); if (read && ctx._reader) { + fprintf(stderr, "resume readable(%d)\n", ctx._fd); auto reader = std::exchange(ctx._reader, nullptr); reader.resume(); } if (write && ctx._writer) { + fprintf(stderr, "resume writable(%d)\n", ctx._fd); auto writer = std::exchange(ctx._writer, nullptr); writer.resume(); } @@ -165,56 +231,114 @@ struct SelectorThread : AsyncIo { return "selector-thread"; } Lazy<SocketHandle> accept(ServerSocket &server_socket) override { - co_await enter_thread(); - co_await readable(server_socket.get_fd()); - co_return server_socket.accept(); + fprintf(stderr, "async accept(%d)\n", server_socket.get_fd()); + bool in_my_thread = co_await enter_thread(); + if (in_my_thread) { + co_await readable(server_socket.get_fd()); + if (!_shutdown) { + co_return server_socket.accept(); + } + } + co_return SocketHandle(-ECANCELED); } Lazy<SocketHandle> connect(const SocketAddress &addr) override { - co_await enter_thread(); - auto tweak = [](SocketHandle &handle){ return handle.set_blocking(false); }; - auto socket = addr.connect(tweak); - co_await writable(socket.get()); - co_return std::move(socket); + fprintf(stderr, "async connect(%s)\n", addr.spec().c_str()); + bool in_my_thread = co_await enter_thread(); + if (in_my_thread) { + auto tweak = [](SocketHandle &handle){ return handle.set_blocking(false); }; + auto socket = addr.connect(tweak); + co_await writable(socket.get()); + if (!_shutdown) { + co_return std::move(socket); + } + } + co_return SocketHandle(-ECANCELED); } Lazy<ssize_t> read(SocketHandle &socket, char *buf, size_t len) override { - co_await enter_thread(); - co_await readable(socket.get()); - co_return socket.read(buf, len); + fprintf(stderr, "async read(%d)\n", socket.get()); + bool in_my_thread = co_await enter_thread(); + if (in_my_thread) { + co_await readable(socket.get()); + if (!_shutdown) { + ssize_t res = socket.read(buf, len); + co_return (res < 0) ? -errno : res; + } + } + co_return -ECANCELED; } Lazy<ssize_t> write(SocketHandle &socket, const char *buf, size_t len) override { - co_await enter_thread(); - co_await writable(socket.get()); - co_return socket.write(buf, len); + fprintf(stderr, "async write(%d)\n", socket.get()); + bool in_my_thread = co_await enter_thread(); + if (in_my_thread) { + co_await writable(socket.get()); + if (!_shutdown) { + ssize_t res = socket.write(buf, len); + co_return (res < 0) ? -errno : res; + } + } + co_return -ECANCELED; } - Work schedule() override { - co_await queue_self_unless(false); - co_return Done{}; + Lazy<bool> schedule() override { + co_return co_await queue_self_unless(false); } - Detached shutdown() { - co_await enter_thread(); + Detached async_shutdown() { + bool in_my_thread = co_await enter_thread(); + REQUIRE(in_my_thread && "unable to initialize shutdown of internal thread"); { auto guard = protect(); _shutdown = true; + _thread_id = std::thread::id(); } } }; void +SelectorThread::start() +{ + fprintf(stderr, "start\n"); + _thread = std::thread(&SelectorThread::main, this); + _thread_id.wait(std::thread::id()); +} + +void SelectorThread::main() { - const int ms_timeout = 100; + _thread_id = std::this_thread::get_id(); + _thread_id.notify_all(); while (!_shutdown) { update_epoll_state(); - _selector.poll(ms_timeout); + fprintf(stderr, "--> epoll wait\n"); + _selector.poll(1000); + fprintf(stderr, "<-- epoll wait: got %zu events\n", _selector.num_events()); _selector.dispatch(*this); handle_queue(); } + fprintf(stderr, "event loop cleanup\n"); + cancel_epoll_state(); + force_handle_queue(); +} + +void +SelectorThread::init_shutdown() +{ + fprintf(stderr, "init_shutdown\n"); + async_shutdown(); } -SelectorThread::~SelectorThread() { - shutdown(); - REQUIRE(!is_my_thread()); +void +SelectorThread::fini_shutdown() +{ + fprintf(stderr, "--> fini_shutdown\n"); _thread.join(); + fprintf(stderr, "<-- fini_shutdown\n"); +} + +SelectorThread::~SelectorThread() +{ + REQUIRE(_state.empty()); + REQUIRE(_check.empty()); + REQUIRE(_todo.empty()); + REQUIRE(_queue.empty()); } } @@ -222,9 +346,45 @@ SelectorThread::~SelectorThread() { AsyncIo::~AsyncIo() = default; AsyncIo::AsyncIo() = default; -std::shared_ptr<AsyncIo> +AsyncIo::Owner::Owner(std::shared_ptr<AsyncIo> async_io) + : _async_io(std::move(async_io)), + _init_shutdown_called(false), + _fini_shutdown_called(false) +{ + _async_io->start(); +} + +void +AsyncIo::Owner::init_shutdown() +{ + if (!_init_shutdown_called) { + if (_async_io) { + _async_io->init_shutdown(); + } + _init_shutdown_called = true; + } +} + +void +AsyncIo::Owner::fini_shutdown() +{ + if (!_fini_shutdown_called) { + init_shutdown(); + if (_async_io) { + _async_io->fini_shutdown(); + } + _fini_shutdown_called = true; + } +} + +AsyncIo::Owner::~Owner() +{ + fini_shutdown(); +} + +AsyncIo::Owner AsyncIo::create() { - return std::make_shared<SelectorThread>(); + return Owner(std::make_shared<SelectorThread>()); } } diff --git a/vespalib/src/vespa/vespalib/coro/async_io.h b/vespalib/src/vespa/vespalib/coro/async_io.h index 91449780789..72e2ef3a312 100644 --- a/vespalib/src/vespa/vespalib/coro/async_io.h +++ b/vespalib/src/vespa/vespalib/coro/async_io.h @@ -11,7 +11,7 @@ namespace vespalib::coro { -// Interfaces defining functions used to perform async io. The initial +// Interface defining functions used to perform async io. The initial // implementation will perform epoll in a single dedicated thread. The // idea is to be able to switch to an implementation using io_uring // some time in the future without having to change existing client @@ -24,9 +24,29 @@ struct AsyncIo : std::enable_shared_from_this<AsyncIo> { AsyncIo &operator=(const AsyncIo &) = delete; AsyncIo &operator=(AsyncIo &&) = delete; virtual ~AsyncIo(); - - // create an async_io 'runtime' with the default implementation - static std::shared_ptr<AsyncIo> create(); + using SP = std::shared_ptr<AsyncIo>; + + // thin wrapper used by the owner to handle lifetime + class Owner { + private: + std::shared_ptr<AsyncIo> _async_io; + bool _init_shutdown_called; + bool _fini_shutdown_called; + public: + Owner(std::shared_ptr<AsyncIo> async_io); + Owner(const Owner &) = delete; + Owner &operator=(const Owner &) = delete; + Owner(Owner &&) = default; + Owner &operator=(Owner &&) = default; + AsyncIo::SP share() { return _async_io->shared_from_this(); } + operator AsyncIo &() { return *_async_io; } + void init_shutdown(); + void fini_shutdown(); + ~Owner(); + }; + + // create an async_io 'runtime' + static Owner create(); // implementation tag virtual vespalib::string get_impl_spec() = 0; @@ -36,11 +56,17 @@ struct AsyncIo : std::enable_shared_from_this<AsyncIo> { virtual Lazy<SocketHandle> connect(const SocketAddress &addr) = 0; virtual Lazy<ssize_t> read(SocketHandle &handle, char *buf, size_t len) = 0; virtual Lazy<ssize_t> write(SocketHandle &handle, const char *buf, size_t len) = 0; - virtual Work schedule() = 0; + virtual Lazy<bool> schedule() = 0; protected: // may only be created via subclass AsyncIo(); + +private: + // called by Owner + virtual void start() = 0; + virtual void init_shutdown() = 0; + virtual void fini_shutdown() = 0; }; } |