diff options
author | Håvard Pettersen <havardpe@yahooinc.com> | 2023-02-01 11:15:54 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@yahooinc.com> | 2023-02-02 12:55:53 +0000 |
commit | 1b14946131aac874e69719c0df7e2c46f4e83ab7 (patch) | |
tree | 0d320d5e0657edff621749c9040e7c61a95414c0 /vespalib | |
parent | 2754ecf502e84e1b4d9fe479d1f68ab1d6194882 (diff) |
make clang happy
Avoid checking thread id more than once in a corotuines, since clang
will assume it cannot change. Also set return values from the outside
of the coroutine instead of trying to infer it from the inside.
Fixed a race in async_run fuctions ('this' is not safe to use after
giving the coroutine away). This goes for all compilers.
Minor fix to io_uring connect to avoid returning valid non-connected
socket if connect was called after AsyncIo shutdown.
Stop using std::move when co_returning local move-only values. clang
15 likes this, but it might not make all compilers more happy...
Diffstat (limited to 'vespalib')
5 files changed, 114 insertions, 114 deletions
diff --git a/vespalib/src/tests/coro/waiting_for/waiting_for_test.cpp b/vespalib/src/tests/coro/waiting_for/waiting_for_test.cpp index c6678657c34..a43ef952c60 100644 --- a/vespalib/src/tests/coro/waiting_for/waiting_for_test.cpp +++ b/vespalib/src/tests/coro/waiting_for/waiting_for_test.cpp @@ -47,7 +47,7 @@ Lazy<int> wait_for_value(Service &service) { template <typename T> Lazy<T> wait_for_fun(auto &&fun) { T result = co_await wait_for<T>(fun); - co_return std::move(result); + co_return result; } TEST(WaitingForTest, wait_for_external_async_int) { diff --git a/vespalib/src/vespa/vespalib/coro/async_crypto_socket.cpp b/vespalib/src/vespa/vespalib/coro/async_crypto_socket.cpp index 81b53102b4e..51061c98428 100644 --- a/vespalib/src/vespa/vespalib/coro/async_crypto_socket.cpp +++ b/vespalib/src/vespa/vespalib/coro/async_crypto_socket.cpp @@ -221,7 +221,7 @@ Lazy<AsyncCryptoSocket::UP> accept_maybe_tls(AsyncIo &async, AbstractTlsCryptoEn } else { auto plain_socket = std::make_unique<SnoopedRawSocket>(async, std::move(handle)); plain_socket->inject_data(buf, snooped); - co_return std::move(plain_socket); + co_return plain_socket; } } diff --git a/vespalib/src/vespa/vespalib/coro/async_io.cpp b/vespalib/src/vespa/vespalib/coro/async_io.cpp index 86eddfbd035..259850c0370 100644 --- a/vespalib/src/vespa/vespalib/coro/async_io.cpp +++ b/vespalib/src/vespa/vespalib/coro/async_io.cpp @@ -30,29 +30,20 @@ namespace vespalib::coro { namespace { -using Handle = std::coroutine_handle<>; - struct SelectorThread : AsyncIo { - struct awaiter_base { - SelectorThread &self; - awaiter_base(SelectorThread &self_in) noexcept : self(self_in) {} - awaiter_base(const awaiter_base &) = delete; - awaiter_base &operator=(const awaiter_base &) = delete; - awaiter_base(awaiter_base &&) = delete; - awaiter_base &operator=(awaiter_base &&) = delete; - }; + using BoolOp = WaitingFor<bool>; struct FdContext { int _fd; bool _epoll_read; bool _epoll_write; - Handle _reader; - Handle _writer; + BoolOp _reader; + BoolOp _writer; FdContext(int fd_in) : _fd(fd_in), _epoll_read(false), _epoll_write(false), - _reader(nullptr), _writer(nullptr) {} + _reader(), _writer() {} }; struct RunGuard; using ThreadId = std::atomic<std::thread::id>; @@ -63,9 +54,9 @@ struct SelectorThread : AsyncIo { std::thread _thread; ThreadId _thread_id; bool _check_queue; - std::vector<Handle> _todo; + std::vector<BoolOp> _todo; std::mutex _lock; - std::vector<Handle> _queue; + std::vector<BoolOp> _queue; SelectorThread() : _state(), @@ -95,67 +86,56 @@ struct SelectorThread : AsyncIo { return (std::this_thread::get_id() == _thread_id.load(std::memory_order_relaxed)); } auto protect() { return std::lock_guard(_lock); } - auto queue_self_unless(bool ready) { - struct awaiter : awaiter_base { - bool ready; - awaiter(SelectorThread &self_in, bool ready_in) noexcept - : awaiter_base(self_in), ready(ready_in) {} - bool await_ready() const noexcept { return ready; } - bool await_resume() const noexcept { return self.in_thread(); } - bool await_suspend(Handle handle) __attribute__((noinline)) { - bool need_wakeup = false; - { - auto guard = self.protect(); - if (self.stopped()) { - return false; - } - need_wakeup = self._queue.empty(); - self._queue.push_back(handle); - } - if (need_wakeup) { - self._selector.wakeup(); - } - return true; - } - }; - return awaiter(*this, ready); + auto async_run() { + return wait_for<bool>([this](auto wf) + { + AsyncIo::SP wakeup_guard; + { + auto guard = protect(); + if (stopped()) { + wf.set_value(false); + return wf.mu(); + } + if (_queue.empty()) { + wakeup_guard = shared_from_this(); + } + _queue.push_back(std::move(wf)); + } + if (wakeup_guard) { + _selector.wakeup(); + } + return wf.nop(); + }); } - auto enter_thread() { return queue_self_unless(in_thread()); } auto readable(int fd) { - struct awaiter : awaiter_base { - int fd; - awaiter(SelectorThread &self_in, int fd_in) noexcept - : awaiter_base(self_in), fd(fd_in) {} - bool await_ready() const noexcept { return (fd < 0) || self.stopped(); } - bool await_resume() const noexcept { return self.running(); } - void await_suspend(Handle handle) __attribute__((noinline)) { - auto [pos, ignore] = self._state.try_emplace(fd, fd); - FdContext &state = pos->second; - REQUIRE(!state._reader && "conflicting reads detected"); - state._reader = handle; - self._check.insert(state._fd); - } - }; - REQUIRE(in_thread()); - return awaiter(*this, fd); + return wait_for<bool>([fd, this](auto wf) + { + if ((fd < 0) || stopped()) { + wf.set_value(false); + return wf.mu(); + } + auto [pos, ignore] = _state.try_emplace(fd, fd); + FdContext &state = pos->second; + REQUIRE(!state._reader && "conflicting reads detected"); + state._reader = std::move(wf); + _check.insert(state._fd); + return wf.nop(); + }); } auto writable(int fd) { - struct awaiter : awaiter_base { - int fd; - awaiter(SelectorThread &self_in, int fd_in) noexcept - : awaiter_base(self_in), fd(fd_in) {} - bool await_ready() const noexcept { return (fd < 0) || self.stopped(); } - bool await_resume() const noexcept { return self.running(); } - void await_suspend(Handle handle) __attribute__((noinline)) { - auto [pos, ignore] = self._state.try_emplace(fd, fd); - FdContext &state = pos->second; - REQUIRE(!state._writer && "conflicting reads detected"); - state._writer = handle; - self._check.insert(state._fd); - } - }; - REQUIRE(in_thread()); - return awaiter(*this, fd); + return wait_for<bool>([fd,this](auto wf) + { + if ((fd < 0) || stopped()) { + wf.set_value(false); + return wf.mu(); + } + auto [pos, ignore] = _state.try_emplace(fd, fd); + FdContext &state = pos->second; + REQUIRE(!state._writer && "conflicting writes detected"); + state._writer = std::move(wf); + _check.insert(state._fd); + return wf.nop(); + }); } void update_epoll_state() { for (int fd: _check) { @@ -186,7 +166,7 @@ struct SelectorThread : AsyncIo { _check.clear(); } void handle_wakeup() { _check_queue = true; } - void handle_queue() { + void handle_queue(bool result) { if (!_check_queue) { return; } @@ -195,26 +175,27 @@ struct SelectorThread : AsyncIo { auto guard = protect(); std::swap(_todo, _queue); } - for (auto &&handle: _todo) { - handle.resume(); + for (auto &&entry: _todo) { + auto wf = std::move(entry); + wf.set_value(result); } _todo.clear(); } void handle_event(FdContext &ctx, bool read, bool write) { _check.insert(ctx._fd); if (read && ctx._reader) { - auto reader = std::exchange(ctx._reader, nullptr); - reader.resume(); + auto reader = std::move(ctx._reader); + reader.set_value(true); } if (write && ctx._writer) { - auto writer = std::exchange(ctx._writer, nullptr); - writer.resume(); + auto writer = std::move(ctx._writer); + writer.set_value(true); } } ImplTag get_impl_tag() override { return ImplTag::EPOLL; } Lazy<SocketHandle> accept(ServerSocket &server_socket) override { - bool in_thread = co_await enter_thread(); - if (in_thread) { + bool inside = in_thread() ? true : co_await async_run(); + if (inside) { bool can_read = co_await readable(server_socket.get_fd()); if (can_read) { auto res = server_socket.accept(); @@ -227,20 +208,20 @@ struct SelectorThread : AsyncIo { co_return SocketHandle(-ECANCELED); } Lazy<SocketHandle> connect(const SocketAddress &addr) override { - bool in_thread = co_await enter_thread(); - if (in_thread) { + bool inside = in_thread() ? true : co_await async_run(); + if (inside) { auto tweak = [](SocketHandle &handle){ return handle.set_blocking(false); }; auto socket = addr.connect(tweak); bool can_write = co_await writable(socket.get()); if (can_write) { - co_return std::move(socket); + co_return socket; } } co_return SocketHandle(-ECANCELED); } Lazy<ssize_t> read(SocketHandle &socket, char *buf, size_t len) override { - bool in_thread = co_await enter_thread(); - if (in_thread) { + bool inside = in_thread() ? true : co_await async_run(); + if (inside) { bool can_read = co_await readable(socket.get()); if (can_read) { ssize_t res = socket.read(buf, len); @@ -250,8 +231,8 @@ struct SelectorThread : AsyncIo { co_return -ECANCELED; } Lazy<ssize_t> write(SocketHandle &socket, const char *buf, size_t len) override { - bool in_thread = co_await enter_thread(); - if (in_thread) { + bool inside = in_thread() ? true : co_await async_run(); + if (inside) { bool can_write = co_await writable(socket.get()); if (can_write) { ssize_t res = socket.write(buf, len); @@ -261,11 +242,11 @@ struct SelectorThread : AsyncIo { co_return -ECANCELED; } Lazy<bool> schedule() override { - co_return co_await queue_self_unless(false); + co_return co_await async_run(); } Detached async_shutdown() { - bool in_thread = co_await enter_thread(); - REQUIRE(in_thread && "unable to initialize shutdown of internal thread"); + bool inside = co_await async_run(); + REQUIRE(inside && "unable to initialize shutdown of internal thread"); { auto guard = protect(); _thread_id = std::thread::id(); @@ -296,18 +277,18 @@ struct SelectorThread::RunGuard { self._selector.remove(ctx._fd); } if (ctx._reader) { - auto reader = std::exchange(ctx._reader, nullptr); - reader.resume(); + auto reader = std::move(ctx._reader); + reader.set_value(false); } if (ctx._writer) { - auto writer = std::exchange(ctx._writer, nullptr); - writer.resume(); + auto writer = std::move(ctx._writer); + writer.set_value(false); } } self._state.clear(); REQUIRE(self._check.empty()); self._check_queue = true; - self.handle_queue(); + self.handle_queue(false); } }; @@ -319,7 +300,7 @@ SelectorThread::main() update_epoll_state(); _selector.poll(1000); _selector.dispatch(*this); - handle_queue(); + handle_queue(true); } } diff --git a/vespalib/src/vespa/vespalib/coro/io_uring_thread.hpp b/vespalib/src/vespa/vespalib/coro/io_uring_thread.hpp index 143e65d02d4..a4d6e85c03e 100644 --- a/vespalib/src/vespa/vespalib/coro/io_uring_thread.hpp +++ b/vespalib/src/vespa/vespalib/coro/io_uring_thread.hpp @@ -142,22 +142,23 @@ struct IoUringThread : AsyncIo { } auto async_run() { return wait_for<bool>([this](auto wf) - ->std::coroutine_handle<> { - bool need_wakeup = false; + AsyncIo::SP wakeup_guard; { auto guard = protect(); if (stopped()) { wf.set_value(false); return wf.mu(); } - need_wakeup = _queue.empty(); + if (_queue.empty()) { + wakeup_guard = shared_from_this(); + } _queue.push_back(std::move(wf)); } - if (need_wakeup) { + if (wakeup_guard) { wakeup(); } - return std::noop_coroutine(); + return wf.nop(); }); } void handle_queue(bool result) { @@ -173,10 +174,10 @@ struct IoUringThread : AsyncIo { } ImplTag get_impl_tag() override { return ImplTag::URING; } Lazy<SocketHandle> accept(ServerSocket &server_socket) override { - BlockingGuard blocking_guard(server_socket.get_fd()); int res = -ECANCELED; bool inside = in_thread() ? true : co_await async_run(); if (inside) { + BlockingGuard blocking_guard(server_socket.get_fd()); auto *sqe = _uring.get_sqe(); io_uring_prep_accept(sqe, server_socket.get_fd(), nullptr, nullptr, 0); res = co_await wait_for_sqe(sqe); @@ -184,19 +185,20 @@ struct IoUringThread : AsyncIo { co_return SocketHandle(res); } Lazy<SocketHandle> connect(const SocketAddress &addr) override { - SocketHandle handle = addr.raw_socket(); - if (handle.valid()) { - bool inside = in_thread() ? true : co_await async_run(); - if (inside) { + bool inside = in_thread() ? true : co_await async_run(); + if (inside) { + SocketHandle handle = addr.raw_socket(); + if (handle.valid()) { auto *sqe = _uring.get_sqe(); io_uring_prep_connect(sqe, handle.get(), addr.raw_addr(), addr.raw_addr_len()); - int res = co_await wait_for_sqe(sqe); + auto res = co_await wait_for_sqe(sqe); if (res < 0) { handle.reset(res); } } + co_return handle; } - co_return handle; + co_return SocketHandle(-ECANCELED); } Lazy<ssize_t> read(SocketHandle &socket, char *buf, size_t len) override { ssize_t res = -ECANCELED; diff --git a/vespalib/src/vespa/vespalib/coro/waiting_for.h b/vespalib/src/vespa/vespalib/coro/waiting_for.h index d82d0779f41..7ded6211200 100644 --- a/vespalib/src/vespa/vespalib/coro/waiting_for.h +++ b/vespalib/src/vespa/vespalib/coro/waiting_for.h @@ -34,10 +34,20 @@ private: PromiseState<T> *_state; WaitingFor(PromiseState<T> *state) noexcept : _state(state) {} public: + WaitingFor() noexcept : _state(nullptr) {} WaitingFor(WaitingFor &&rhs) noexcept : _state(std::exchange(rhs._state, nullptr)) {} - WaitingFor(WaitingFor &rhs) = delete; - WaitingFor &operator=(WaitingFor &rhs) = delete; + WaitingFor &operator=(WaitingFor &&rhs) { + if (_state) { + _state->result.set_done(); // canceled + _state->waiter.resume(); + } + _state = std::exchange(rhs._state, nullptr); + return *this; + } + WaitingFor(const WaitingFor &rhs) = delete; + WaitingFor &operator=(const WaitingFor &rhs) = delete; ~WaitingFor(); + operator bool() const noexcept { return _state; } template <typename RET> void set_value(RET &&value) { _state->result.set_value(std::forward<RET>(value)); @@ -70,12 +80,19 @@ public: _state = nullptr; return handle; } + // If some branch in the async start function wants to return mu, + // other branches can return nop. This is to help the compiler + // figure out the return type of lambdas, since + // std::noop_coroutine() is a distinct type. + [[nodiscard]] static std::coroutine_handle<> nop() noexcept { + return std::noop_coroutine(); + } }; template <typename T> WaitingFor<T>::~WaitingFor() { - if (_state != nullptr) { + if (_state) { _state->waiter.resume(); } } |