diff options
author | HÃ¥vard Pettersen <3535158+havardpe@users.noreply.github.com> | 2023-01-04 15:06:00 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-04 15:06:00 +0100 |
commit | 8fbcc378b4be4af4bea4aa2136731a61f185d1e7 (patch) | |
tree | 523b778cf5c81d06c6120e3056d5144e2ac862c3 | |
parent | 85e3166919ccff9b15fd1b14b834a6a39634fab4 (diff) | |
parent | c75aa48ef5794b05e70e5fbd5fc9cb4fee0b8fde (diff) |
Merge pull request #25377 from vespa-engine/havardpe/remove-shutdown-flag
remove _shutdown flag
-rw-r--r-- | vespalib/src/vespa/vespalib/coro/async_io.cpp | 154 |
1 files changed, 69 insertions, 85 deletions
diff --git a/vespalib/src/vespa/vespalib/coro/async_io.cpp b/vespalib/src/vespa/vespalib/coro/async_io.cpp index a38092a79fd..8d628fd7887 100644 --- a/vespalib/src/vespa/vespalib/coro/async_io.cpp +++ b/vespalib/src/vespa/vespalib/coro/async_io.cpp @@ -38,12 +38,12 @@ struct SelectorThread : AsyncIo { _epoll_read(false), _epoll_write(false), _reader(nullptr), _writer(nullptr) {} }; + struct RunGuard; using ThreadId = std::atomic<std::thread::id>; - + std::map<int,FdContext> _state; std::set<int> _check; Selector<FdContext> _selector; - bool _shutdown; std::thread _thread; ThreadId _thread_id; bool _check_queue; @@ -55,7 +55,6 @@ struct SelectorThread : AsyncIo { : _state(), _check(), _selector(), - _shutdown(false), _thread(), _thread_id(std::thread::id()), _check_queue(false), @@ -70,7 +69,13 @@ struct SelectorThread : AsyncIo { void init_shutdown() override; void fini_shutdown() override; ~SelectorThread(); - bool is_my_thread() const { + bool running() const noexcept { + return (_thread_id.load(std::memory_order_relaxed) != std::thread::id()); + } + bool stopped() const noexcept { + return (_thread_id.load(std::memory_order_relaxed) == std::thread::id()); + } + bool in_thread() const noexcept { return (std::this_thread::get_id() == _thread_id.load(std::memory_order_relaxed)); } auto protect() { return std::lock_guard(_lock); } @@ -80,12 +85,12 @@ struct SelectorThread : AsyncIo { awaiter(SelectorThread &self_in, bool ready_in) noexcept : awaiter_base(self_in), ready(ready_in) {} bool await_ready() const noexcept { return ready; } - bool await_resume() const noexcept { return self.is_my_thread(); } + bool await_resume() const noexcept { return self.in_thread(); } bool await_suspend(Handle handle) __attribute__((noinline)) { bool need_wakeup = false; { auto guard = self.protect(); - if (self._shutdown) { + if (self.stopped()) { return false; } need_wakeup = self._queue.empty(); @@ -99,14 +104,14 @@ struct SelectorThread : AsyncIo { }; return awaiter(*this, ready); } - auto enter_thread() { return queue_self_unless(is_my_thread()); } + auto enter_thread() { return queue_self_unless(in_thread()); } auto readable(int fd) { struct awaiter : awaiter_base { int fd; awaiter(SelectorThread &self_in, int fd_in) noexcept : awaiter_base(self_in), fd(fd_in) {} - bool await_ready() const noexcept { return (fd < 0) || self._shutdown; } - void await_resume() const noexcept {} + bool await_ready() const noexcept { return (fd < 0) || self.stopped(); } + bool await_resume() const noexcept { return self.running(); } void await_suspend(Handle handle) __attribute__((noinline)) { auto [pos, ignore] = self._state.try_emplace(fd, fd); FdContext &state = pos->second; @@ -115,8 +120,7 @@ struct SelectorThread : AsyncIo { self._check.insert(state._fd); } }; - fprintf(stderr, "await readable(%d)\n", fd); - REQUIRE(is_my_thread()); + REQUIRE(in_thread()); return awaiter(*this, fd); } auto writable(int fd) { @@ -124,8 +128,8 @@ struct SelectorThread : AsyncIo { int fd; awaiter(SelectorThread &self_in, int fd_in) noexcept : awaiter_base(self_in), fd(fd_in) {} - bool await_ready() const noexcept { return (fd < 0) || self._shutdown; } - void await_resume() const noexcept {} + bool await_ready() const noexcept { return (fd < 0) || self.stopped(); } + bool await_resume() const noexcept { return self.running(); } void await_suspend(Handle handle) __attribute__((noinline)) { auto [pos, ignore] = self._state.try_emplace(fd, fd); FdContext &state = pos->second; @@ -134,8 +138,7 @@ struct SelectorThread : AsyncIo { self._check.insert(state._fd); } }; - fprintf(stderr, "await writable(%d)\n", fd); - REQUIRE(is_my_thread()); + REQUIRE(in_thread()); return awaiter(*this, fd); } void update_epoll_state() { @@ -150,20 +153,15 @@ struct SelectorThread : AsyncIo { bool read_changed = ctx._epoll_read != bool(ctx._reader); bool write_changed = ctx._epoll_write != bool(ctx._writer); if (read_changed || write_changed) { - fprintf(stderr, "epoll update %d %s %s\n", ctx._fd, - ctx._reader ? "read" : "-", ctx._writer ? "write" : "-"); _selector.update(ctx._fd, ctx, bool(ctx._reader), bool(ctx._writer)); } } else { - fprintf(stderr, "epoll add %d %s %s\n", ctx._fd, - ctx._reader ? "read" : "-", ctx._writer ? "write" : "-"); _selector.add(ctx._fd, ctx, bool(ctx._reader), bool(ctx._writer)); } ctx._epoll_read = bool(ctx._reader); ctx._epoll_write = bool(ctx._writer); } else { if (was_added) { - fprintf(stderr, "epoll remove %d\n", ctx._fd); _selector.remove(ctx._fd); } _state.erase(pos); @@ -171,28 +169,6 @@ struct SelectorThread : AsyncIo { } _check.clear(); } - void cancel_epoll_state() { - REQUIRE(_shutdown); - _check.clear(); - for (auto &entry: _state) { - FdContext &ctx = entry.second; - const bool was_added = (ctx._epoll_read || ctx._epoll_write); - if (was_added) { - fprintf(stderr, "epoll remove %d (shutdown)\n", ctx._fd); - _selector.remove(ctx._fd); - } - if (ctx._reader) { - auto reader = std::exchange(ctx._reader, nullptr); - reader.resume(); - } - if (ctx._writer) { - auto writer = std::exchange(ctx._writer, nullptr); - writer.resume(); - } - } - _state.clear(); - REQUIRE(_check.empty()); - } void handle_wakeup() { _check_queue = true; } void handle_queue() { if (!_check_queue) { @@ -203,26 +179,18 @@ struct SelectorThread : AsyncIo { auto guard = protect(); std::swap(_todo, _queue); } - fprintf(stderr, "todo list: %zu items\n", _todo.size()); for (auto &&handle: _todo) { handle.resume(); } _todo.clear(); } - void force_handle_queue() { - REQUIRE(_shutdown); - _check_queue = true; - handle_queue(); - } void handle_event(FdContext &ctx, bool read, bool write) { _check.insert(ctx._fd); if (read && ctx._reader) { - fprintf(stderr, "resume readable(%d)\n", ctx._fd); auto reader = std::exchange(ctx._reader, nullptr); reader.resume(); } if (write && ctx._writer) { - fprintf(stderr, "resume writable(%d)\n", ctx._fd); auto writer = std::exchange(ctx._writer, nullptr); writer.resume(); } @@ -231,35 +199,32 @@ struct SelectorThread : AsyncIo { return "selector-thread"; } Lazy<SocketHandle> accept(ServerSocket &server_socket) override { - fprintf(stderr, "async accept(%d)\n", server_socket.get_fd()); - bool in_my_thread = co_await enter_thread(); - if (in_my_thread) { - co_await readable(server_socket.get_fd()); - if (!_shutdown) { + bool in_thread = co_await enter_thread(); + if (in_thread) { + bool can_read = co_await readable(server_socket.get_fd()); + if (can_read) { co_return server_socket.accept(); } } co_return SocketHandle(-ECANCELED); } Lazy<SocketHandle> connect(const SocketAddress &addr) override { - fprintf(stderr, "async connect(%s)\n", addr.spec().c_str()); - bool in_my_thread = co_await enter_thread(); - if (in_my_thread) { + bool in_thread = co_await enter_thread(); + if (in_thread) { auto tweak = [](SocketHandle &handle){ return handle.set_blocking(false); }; auto socket = addr.connect(tweak); - co_await writable(socket.get()); - if (!_shutdown) { + bool can_write = co_await writable(socket.get()); + if (can_write) { co_return std::move(socket); } } co_return SocketHandle(-ECANCELED); } Lazy<ssize_t> read(SocketHandle &socket, char *buf, size_t len) override { - fprintf(stderr, "async read(%d)\n", socket.get()); - bool in_my_thread = co_await enter_thread(); - if (in_my_thread) { - co_await readable(socket.get()); - if (!_shutdown) { + bool in_thread = co_await enter_thread(); + if (in_thread) { + bool can_read = co_await readable(socket.get()); + if (can_read) { ssize_t res = socket.read(buf, len); co_return (res < 0) ? -errno : res; } @@ -267,11 +232,10 @@ struct SelectorThread : AsyncIo { co_return -ECANCELED; } Lazy<ssize_t> write(SocketHandle &socket, const char *buf, size_t len) override { - fprintf(stderr, "async write(%d)\n", socket.get()); - bool in_my_thread = co_await enter_thread(); - if (in_my_thread) { - co_await writable(socket.get()); - if (!_shutdown) { + bool in_thread = co_await enter_thread(); + if (in_thread) { + bool can_write = co_await writable(socket.get()); + if (can_write) { ssize_t res = socket.write(buf, len); co_return (res < 0) ? -errno : res; } @@ -282,11 +246,10 @@ struct SelectorThread : AsyncIo { co_return co_await queue_self_unless(false); } Detached async_shutdown() { - bool in_my_thread = co_await enter_thread(); - REQUIRE(in_my_thread && "unable to initialize shutdown of internal thread"); + bool in_thread = co_await enter_thread(); + REQUIRE(in_thread && "unable to initialize shutdown of internal thread"); { auto guard = protect(); - _shutdown = true; _thread_id = std::thread::id(); } } @@ -295,42 +258,63 @@ struct SelectorThread : AsyncIo { void SelectorThread::start() { - fprintf(stderr, "start\n"); _thread = std::thread(&SelectorThread::main, this); _thread_id.wait(std::thread::id()); } +struct SelectorThread::RunGuard { + SelectorThread &self; + RunGuard(SelectorThread &self_in) noexcept : self(self_in) { + self._thread_id = std::this_thread::get_id(); + self._thread_id.notify_all(); + } + ~RunGuard() { + REQUIRE(self.stopped()); + self._check.clear(); + for (auto &entry: self._state) { + FdContext &ctx = entry.second; + const bool was_added = (ctx._epoll_read || ctx._epoll_write); + if (was_added) { + self._selector.remove(ctx._fd); + } + if (ctx._reader) { + auto reader = std::exchange(ctx._reader, nullptr); + reader.resume(); + } + if (ctx._writer) { + auto writer = std::exchange(ctx._writer, nullptr); + writer.resume(); + } + } + self._state.clear(); + REQUIRE(self._check.empty()); + self._check_queue = true; + self.handle_queue(); + } +}; + void SelectorThread::main() { - _thread_id = std::this_thread::get_id(); - _thread_id.notify_all(); - while (!_shutdown) { + RunGuard guard(*this); + while (running()) { update_epoll_state(); - fprintf(stderr, "--> epoll wait\n"); _selector.poll(1000); - fprintf(stderr, "<-- epoll wait: got %zu events\n", _selector.num_events()); _selector.dispatch(*this); handle_queue(); } - fprintf(stderr, "event loop cleanup\n"); - cancel_epoll_state(); - force_handle_queue(); } void SelectorThread::init_shutdown() { - fprintf(stderr, "init_shutdown\n"); async_shutdown(); } void SelectorThread::fini_shutdown() { - fprintf(stderr, "--> fini_shutdown\n"); _thread.join(); - fprintf(stderr, "<-- fini_shutdown\n"); } SelectorThread::~SelectorThread() |