summaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2023-02-01 11:15:54 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2023-02-02 12:55:53 +0000
commit1b14946131aac874e69719c0df7e2c46f4e83ab7 (patch)
tree0d320d5e0657edff621749c9040e7c61a95414c0 /vespalib
parent2754ecf502e84e1b4d9fe479d1f68ab1d6194882 (diff)
make clang happy
Avoid checking thread id more than once in a corotuines, since clang will assume it cannot change. Also set return values from the outside of the coroutine instead of trying to infer it from the inside. Fixed a race in async_run fuctions ('this' is not safe to use after giving the coroutine away). This goes for all compilers. Minor fix to io_uring connect to avoid returning valid non-connected socket if connect was called after AsyncIo shutdown. Stop using std::move when co_returning local move-only values. clang 15 likes this, but it might not make all compilers more happy...
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/tests/coro/waiting_for/waiting_for_test.cpp2
-rw-r--r--vespalib/src/vespa/vespalib/coro/async_crypto_socket.cpp2
-rw-r--r--vespalib/src/vespa/vespalib/coro/async_io.cpp175
-rw-r--r--vespalib/src/vespa/vespalib/coro/io_uring_thread.hpp26
-rw-r--r--vespalib/src/vespa/vespalib/coro/waiting_for.h23
5 files changed, 114 insertions, 114 deletions
diff --git a/vespalib/src/tests/coro/waiting_for/waiting_for_test.cpp b/vespalib/src/tests/coro/waiting_for/waiting_for_test.cpp
index c6678657c34..a43ef952c60 100644
--- a/vespalib/src/tests/coro/waiting_for/waiting_for_test.cpp
+++ b/vespalib/src/tests/coro/waiting_for/waiting_for_test.cpp
@@ -47,7 +47,7 @@ Lazy<int> wait_for_value(Service &service) {
template <typename T>
Lazy<T> wait_for_fun(auto &&fun) {
T result = co_await wait_for<T>(fun);
- co_return std::move(result);
+ co_return result;
}
TEST(WaitingForTest, wait_for_external_async_int) {
diff --git a/vespalib/src/vespa/vespalib/coro/async_crypto_socket.cpp b/vespalib/src/vespa/vespalib/coro/async_crypto_socket.cpp
index 81b53102b4e..51061c98428 100644
--- a/vespalib/src/vespa/vespalib/coro/async_crypto_socket.cpp
+++ b/vespalib/src/vespa/vespalib/coro/async_crypto_socket.cpp
@@ -221,7 +221,7 @@ Lazy<AsyncCryptoSocket::UP> accept_maybe_tls(AsyncIo &async, AbstractTlsCryptoEn
} else {
auto plain_socket = std::make_unique<SnoopedRawSocket>(async, std::move(handle));
plain_socket->inject_data(buf, snooped);
- co_return std::move(plain_socket);
+ co_return plain_socket;
}
}
diff --git a/vespalib/src/vespa/vespalib/coro/async_io.cpp b/vespalib/src/vespa/vespalib/coro/async_io.cpp
index 86eddfbd035..259850c0370 100644
--- a/vespalib/src/vespa/vespalib/coro/async_io.cpp
+++ b/vespalib/src/vespa/vespalib/coro/async_io.cpp
@@ -30,29 +30,20 @@ namespace vespalib::coro {
namespace {
-using Handle = std::coroutine_handle<>;
-
struct SelectorThread : AsyncIo {
- struct awaiter_base {
- SelectorThread &self;
- awaiter_base(SelectorThread &self_in) noexcept : self(self_in) {}
- awaiter_base(const awaiter_base &) = delete;
- awaiter_base &operator=(const awaiter_base &) = delete;
- awaiter_base(awaiter_base &&) = delete;
- awaiter_base &operator=(awaiter_base &&) = delete;
- };
+ using BoolOp = WaitingFor<bool>;
struct FdContext {
int _fd;
bool _epoll_read;
bool _epoll_write;
- Handle _reader;
- Handle _writer;
+ BoolOp _reader;
+ BoolOp _writer;
FdContext(int fd_in)
: _fd(fd_in),
_epoll_read(false), _epoll_write(false),
- _reader(nullptr), _writer(nullptr) {}
+ _reader(), _writer() {}
};
struct RunGuard;
using ThreadId = std::atomic<std::thread::id>;
@@ -63,9 +54,9 @@ struct SelectorThread : AsyncIo {
std::thread _thread;
ThreadId _thread_id;
bool _check_queue;
- std::vector<Handle> _todo;
+ std::vector<BoolOp> _todo;
std::mutex _lock;
- std::vector<Handle> _queue;
+ std::vector<BoolOp> _queue;
SelectorThread()
: _state(),
@@ -95,67 +86,56 @@ struct SelectorThread : AsyncIo {
return (std::this_thread::get_id() == _thread_id.load(std::memory_order_relaxed));
}
auto protect() { return std::lock_guard(_lock); }
- auto queue_self_unless(bool ready) {
- struct awaiter : awaiter_base {
- bool ready;
- awaiter(SelectorThread &self_in, bool ready_in) noexcept
- : awaiter_base(self_in), ready(ready_in) {}
- bool await_ready() const noexcept { return ready; }
- bool await_resume() const noexcept { return self.in_thread(); }
- bool await_suspend(Handle handle) __attribute__((noinline)) {
- bool need_wakeup = false;
- {
- auto guard = self.protect();
- if (self.stopped()) {
- return false;
- }
- need_wakeup = self._queue.empty();
- self._queue.push_back(handle);
- }
- if (need_wakeup) {
- self._selector.wakeup();
- }
- return true;
- }
- };
- return awaiter(*this, ready);
+ auto async_run() {
+ return wait_for<bool>([this](auto wf)
+ {
+ AsyncIo::SP wakeup_guard;
+ {
+ auto guard = protect();
+ if (stopped()) {
+ wf.set_value(false);
+ return wf.mu();
+ }
+ if (_queue.empty()) {
+ wakeup_guard = shared_from_this();
+ }
+ _queue.push_back(std::move(wf));
+ }
+ if (wakeup_guard) {
+ _selector.wakeup();
+ }
+ return wf.nop();
+ });
}
- auto enter_thread() { return queue_self_unless(in_thread()); }
auto readable(int fd) {
- struct awaiter : awaiter_base {
- int fd;
- awaiter(SelectorThread &self_in, int fd_in) noexcept
- : awaiter_base(self_in), fd(fd_in) {}
- bool await_ready() const noexcept { return (fd < 0) || self.stopped(); }
- bool await_resume() const noexcept { return self.running(); }
- void await_suspend(Handle handle) __attribute__((noinline)) {
- auto [pos, ignore] = self._state.try_emplace(fd, fd);
- FdContext &state = pos->second;
- REQUIRE(!state._reader && "conflicting reads detected");
- state._reader = handle;
- self._check.insert(state._fd);
- }
- };
- REQUIRE(in_thread());
- return awaiter(*this, fd);
+ return wait_for<bool>([fd, this](auto wf)
+ {
+ if ((fd < 0) || stopped()) {
+ wf.set_value(false);
+ return wf.mu();
+ }
+ auto [pos, ignore] = _state.try_emplace(fd, fd);
+ FdContext &state = pos->second;
+ REQUIRE(!state._reader && "conflicting reads detected");
+ state._reader = std::move(wf);
+ _check.insert(state._fd);
+ return wf.nop();
+ });
}
auto writable(int fd) {
- struct awaiter : awaiter_base {
- int fd;
- awaiter(SelectorThread &self_in, int fd_in) noexcept
- : awaiter_base(self_in), fd(fd_in) {}
- bool await_ready() const noexcept { return (fd < 0) || self.stopped(); }
- bool await_resume() const noexcept { return self.running(); }
- void await_suspend(Handle handle) __attribute__((noinline)) {
- auto [pos, ignore] = self._state.try_emplace(fd, fd);
- FdContext &state = pos->second;
- REQUIRE(!state._writer && "conflicting reads detected");
- state._writer = handle;
- self._check.insert(state._fd);
- }
- };
- REQUIRE(in_thread());
- return awaiter(*this, fd);
+ return wait_for<bool>([fd,this](auto wf)
+ {
+ if ((fd < 0) || stopped()) {
+ wf.set_value(false);
+ return wf.mu();
+ }
+ auto [pos, ignore] = _state.try_emplace(fd, fd);
+ FdContext &state = pos->second;
+ REQUIRE(!state._writer && "conflicting writes detected");
+ state._writer = std::move(wf);
+ _check.insert(state._fd);
+ return wf.nop();
+ });
}
void update_epoll_state() {
for (int fd: _check) {
@@ -186,7 +166,7 @@ struct SelectorThread : AsyncIo {
_check.clear();
}
void handle_wakeup() { _check_queue = true; }
- void handle_queue() {
+ void handle_queue(bool result) {
if (!_check_queue) {
return;
}
@@ -195,26 +175,27 @@ struct SelectorThread : AsyncIo {
auto guard = protect();
std::swap(_todo, _queue);
}
- for (auto &&handle: _todo) {
- handle.resume();
+ for (auto &&entry: _todo) {
+ auto wf = std::move(entry);
+ wf.set_value(result);
}
_todo.clear();
}
void handle_event(FdContext &ctx, bool read, bool write) {
_check.insert(ctx._fd);
if (read && ctx._reader) {
- auto reader = std::exchange(ctx._reader, nullptr);
- reader.resume();
+ auto reader = std::move(ctx._reader);
+ reader.set_value(true);
}
if (write && ctx._writer) {
- auto writer = std::exchange(ctx._writer, nullptr);
- writer.resume();
+ auto writer = std::move(ctx._writer);
+ writer.set_value(true);
}
}
ImplTag get_impl_tag() override { return ImplTag::EPOLL; }
Lazy<SocketHandle> accept(ServerSocket &server_socket) override {
- bool in_thread = co_await enter_thread();
- if (in_thread) {
+ bool inside = in_thread() ? true : co_await async_run();
+ if (inside) {
bool can_read = co_await readable(server_socket.get_fd());
if (can_read) {
auto res = server_socket.accept();
@@ -227,20 +208,20 @@ struct SelectorThread : AsyncIo {
co_return SocketHandle(-ECANCELED);
}
Lazy<SocketHandle> connect(const SocketAddress &addr) override {
- bool in_thread = co_await enter_thread();
- if (in_thread) {
+ bool inside = in_thread() ? true : co_await async_run();
+ if (inside) {
auto tweak = [](SocketHandle &handle){ return handle.set_blocking(false); };
auto socket = addr.connect(tweak);
bool can_write = co_await writable(socket.get());
if (can_write) {
- co_return std::move(socket);
+ co_return socket;
}
}
co_return SocketHandle(-ECANCELED);
}
Lazy<ssize_t> read(SocketHandle &socket, char *buf, size_t len) override {
- bool in_thread = co_await enter_thread();
- if (in_thread) {
+ bool inside = in_thread() ? true : co_await async_run();
+ if (inside) {
bool can_read = co_await readable(socket.get());
if (can_read) {
ssize_t res = socket.read(buf, len);
@@ -250,8 +231,8 @@ struct SelectorThread : AsyncIo {
co_return -ECANCELED;
}
Lazy<ssize_t> write(SocketHandle &socket, const char *buf, size_t len) override {
- bool in_thread = co_await enter_thread();
- if (in_thread) {
+ bool inside = in_thread() ? true : co_await async_run();
+ if (inside) {
bool can_write = co_await writable(socket.get());
if (can_write) {
ssize_t res = socket.write(buf, len);
@@ -261,11 +242,11 @@ struct SelectorThread : AsyncIo {
co_return -ECANCELED;
}
Lazy<bool> schedule() override {
- co_return co_await queue_self_unless(false);
+ co_return co_await async_run();
}
Detached async_shutdown() {
- bool in_thread = co_await enter_thread();
- REQUIRE(in_thread && "unable to initialize shutdown of internal thread");
+ bool inside = co_await async_run();
+ REQUIRE(inside && "unable to initialize shutdown of internal thread");
{
auto guard = protect();
_thread_id = std::thread::id();
@@ -296,18 +277,18 @@ struct SelectorThread::RunGuard {
self._selector.remove(ctx._fd);
}
if (ctx._reader) {
- auto reader = std::exchange(ctx._reader, nullptr);
- reader.resume();
+ auto reader = std::move(ctx._reader);
+ reader.set_value(false);
}
if (ctx._writer) {
- auto writer = std::exchange(ctx._writer, nullptr);
- writer.resume();
+ auto writer = std::move(ctx._writer);
+ writer.set_value(false);
}
}
self._state.clear();
REQUIRE(self._check.empty());
self._check_queue = true;
- self.handle_queue();
+ self.handle_queue(false);
}
};
@@ -319,7 +300,7 @@ SelectorThread::main()
update_epoll_state();
_selector.poll(1000);
_selector.dispatch(*this);
- handle_queue();
+ handle_queue(true);
}
}
diff --git a/vespalib/src/vespa/vespalib/coro/io_uring_thread.hpp b/vespalib/src/vespa/vespalib/coro/io_uring_thread.hpp
index 143e65d02d4..a4d6e85c03e 100644
--- a/vespalib/src/vespa/vespalib/coro/io_uring_thread.hpp
+++ b/vespalib/src/vespa/vespalib/coro/io_uring_thread.hpp
@@ -142,22 +142,23 @@ struct IoUringThread : AsyncIo {
}
auto async_run() {
return wait_for<bool>([this](auto wf)
- ->std::coroutine_handle<>
{
- bool need_wakeup = false;
+ AsyncIo::SP wakeup_guard;
{
auto guard = protect();
if (stopped()) {
wf.set_value(false);
return wf.mu();
}
- need_wakeup = _queue.empty();
+ if (_queue.empty()) {
+ wakeup_guard = shared_from_this();
+ }
_queue.push_back(std::move(wf));
}
- if (need_wakeup) {
+ if (wakeup_guard) {
wakeup();
}
- return std::noop_coroutine();
+ return wf.nop();
});
}
void handle_queue(bool result) {
@@ -173,10 +174,10 @@ struct IoUringThread : AsyncIo {
}
ImplTag get_impl_tag() override { return ImplTag::URING; }
Lazy<SocketHandle> accept(ServerSocket &server_socket) override {
- BlockingGuard blocking_guard(server_socket.get_fd());
int res = -ECANCELED;
bool inside = in_thread() ? true : co_await async_run();
if (inside) {
+ BlockingGuard blocking_guard(server_socket.get_fd());
auto *sqe = _uring.get_sqe();
io_uring_prep_accept(sqe, server_socket.get_fd(), nullptr, nullptr, 0);
res = co_await wait_for_sqe(sqe);
@@ -184,19 +185,20 @@ struct IoUringThread : AsyncIo {
co_return SocketHandle(res);
}
Lazy<SocketHandle> connect(const SocketAddress &addr) override {
- SocketHandle handle = addr.raw_socket();
- if (handle.valid()) {
- bool inside = in_thread() ? true : co_await async_run();
- if (inside) {
+ bool inside = in_thread() ? true : co_await async_run();
+ if (inside) {
+ SocketHandle handle = addr.raw_socket();
+ if (handle.valid()) {
auto *sqe = _uring.get_sqe();
io_uring_prep_connect(sqe, handle.get(), addr.raw_addr(), addr.raw_addr_len());
- int res = co_await wait_for_sqe(sqe);
+ auto res = co_await wait_for_sqe(sqe);
if (res < 0) {
handle.reset(res);
}
}
+ co_return handle;
}
- co_return handle;
+ co_return SocketHandle(-ECANCELED);
}
Lazy<ssize_t> read(SocketHandle &socket, char *buf, size_t len) override {
ssize_t res = -ECANCELED;
diff --git a/vespalib/src/vespa/vespalib/coro/waiting_for.h b/vespalib/src/vespa/vespalib/coro/waiting_for.h
index d82d0779f41..7ded6211200 100644
--- a/vespalib/src/vespa/vespalib/coro/waiting_for.h
+++ b/vespalib/src/vespa/vespalib/coro/waiting_for.h
@@ -34,10 +34,20 @@ private:
PromiseState<T> *_state;
WaitingFor(PromiseState<T> *state) noexcept : _state(state) {}
public:
+ WaitingFor() noexcept : _state(nullptr) {}
WaitingFor(WaitingFor &&rhs) noexcept : _state(std::exchange(rhs._state, nullptr)) {}
- WaitingFor(WaitingFor &rhs) = delete;
- WaitingFor &operator=(WaitingFor &rhs) = delete;
+ WaitingFor &operator=(WaitingFor &&rhs) {
+ if (_state) {
+ _state->result.set_done(); // canceled
+ _state->waiter.resume();
+ }
+ _state = std::exchange(rhs._state, nullptr);
+ return *this;
+ }
+ WaitingFor(const WaitingFor &rhs) = delete;
+ WaitingFor &operator=(const WaitingFor &rhs) = delete;
~WaitingFor();
+ operator bool() const noexcept { return _state; }
template <typename RET>
void set_value(RET &&value) {
_state->result.set_value(std::forward<RET>(value));
@@ -70,12 +80,19 @@ public:
_state = nullptr;
return handle;
}
+ // If some branch in the async start function wants to return mu,
+ // other branches can return nop. This is to help the compiler
+ // figure out the return type of lambdas, since
+ // std::noop_coroutine() is a distinct type.
+ [[nodiscard]] static std::coroutine_handle<> nop() noexcept {
+ return std::noop_coroutine();
+ }
};
template <typename T>
WaitingFor<T>::~WaitingFor()
{
- if (_state != nullptr) {
+ if (_state) {
_state->waiter.resume();
}
}