aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2023-01-03 15:18:08 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2023-01-03 15:18:08 +0000
commitc75aa48ef5794b05e70e5fbd5fc9cb4fee0b8fde (patch)
tree37384130788703823b2d9730a1b3f78d44f9019c /vespalib
parent0fe8cdb0a96fdce305a85058a3e84fcba69af5d4 (diff)
remove _shutdown flag
- remove debug printf - move shutdown code into RunGuard class - let readable/writable return bool (ok/cancel)
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/vespa/vespalib/coro/async_io.cpp154
1 files changed, 69 insertions, 85 deletions
diff --git a/vespalib/src/vespa/vespalib/coro/async_io.cpp b/vespalib/src/vespa/vespalib/coro/async_io.cpp
index a38092a79fd..8d628fd7887 100644
--- a/vespalib/src/vespa/vespalib/coro/async_io.cpp
+++ b/vespalib/src/vespa/vespalib/coro/async_io.cpp
@@ -38,12 +38,12 @@ struct SelectorThread : AsyncIo {
_epoll_read(false), _epoll_write(false),
_reader(nullptr), _writer(nullptr) {}
};
+ struct RunGuard;
using ThreadId = std::atomic<std::thread::id>;
-
+
std::map<int,FdContext> _state;
std::set<int> _check;
Selector<FdContext> _selector;
- bool _shutdown;
std::thread _thread;
ThreadId _thread_id;
bool _check_queue;
@@ -55,7 +55,6 @@ struct SelectorThread : AsyncIo {
: _state(),
_check(),
_selector(),
- _shutdown(false),
_thread(),
_thread_id(std::thread::id()),
_check_queue(false),
@@ -70,7 +69,13 @@ struct SelectorThread : AsyncIo {
void init_shutdown() override;
void fini_shutdown() override;
~SelectorThread();
- bool is_my_thread() const {
+ bool running() const noexcept {
+ return (_thread_id.load(std::memory_order_relaxed) != std::thread::id());
+ }
+ bool stopped() const noexcept {
+ return (_thread_id.load(std::memory_order_relaxed) == std::thread::id());
+ }
+ bool in_thread() const noexcept {
return (std::this_thread::get_id() == _thread_id.load(std::memory_order_relaxed));
}
auto protect() { return std::lock_guard(_lock); }
@@ -80,12 +85,12 @@ struct SelectorThread : AsyncIo {
awaiter(SelectorThread &self_in, bool ready_in) noexcept
: awaiter_base(self_in), ready(ready_in) {}
bool await_ready() const noexcept { return ready; }
- bool await_resume() const noexcept { return self.is_my_thread(); }
+ bool await_resume() const noexcept { return self.in_thread(); }
bool await_suspend(Handle handle) __attribute__((noinline)) {
bool need_wakeup = false;
{
auto guard = self.protect();
- if (self._shutdown) {
+ if (self.stopped()) {
return false;
}
need_wakeup = self._queue.empty();
@@ -99,14 +104,14 @@ struct SelectorThread : AsyncIo {
};
return awaiter(*this, ready);
}
- auto enter_thread() { return queue_self_unless(is_my_thread()); }
+ auto enter_thread() { return queue_self_unless(in_thread()); }
auto readable(int fd) {
struct awaiter : awaiter_base {
int fd;
awaiter(SelectorThread &self_in, int fd_in) noexcept
: awaiter_base(self_in), fd(fd_in) {}
- bool await_ready() const noexcept { return (fd < 0) || self._shutdown; }
- void await_resume() const noexcept {}
+ bool await_ready() const noexcept { return (fd < 0) || self.stopped(); }
+ bool await_resume() const noexcept { return self.running(); }
void await_suspend(Handle handle) __attribute__((noinline)) {
auto [pos, ignore] = self._state.try_emplace(fd, fd);
FdContext &state = pos->second;
@@ -115,8 +120,7 @@ struct SelectorThread : AsyncIo {
self._check.insert(state._fd);
}
};
- fprintf(stderr, "await readable(%d)\n", fd);
- REQUIRE(is_my_thread());
+ REQUIRE(in_thread());
return awaiter(*this, fd);
}
auto writable(int fd) {
@@ -124,8 +128,8 @@ struct SelectorThread : AsyncIo {
int fd;
awaiter(SelectorThread &self_in, int fd_in) noexcept
: awaiter_base(self_in), fd(fd_in) {}
- bool await_ready() const noexcept { return (fd < 0) || self._shutdown; }
- void await_resume() const noexcept {}
+ bool await_ready() const noexcept { return (fd < 0) || self.stopped(); }
+ bool await_resume() const noexcept { return self.running(); }
void await_suspend(Handle handle) __attribute__((noinline)) {
auto [pos, ignore] = self._state.try_emplace(fd, fd);
FdContext &state = pos->second;
@@ -134,8 +138,7 @@ struct SelectorThread : AsyncIo {
self._check.insert(state._fd);
}
};
- fprintf(stderr, "await writable(%d)\n", fd);
- REQUIRE(is_my_thread());
+ REQUIRE(in_thread());
return awaiter(*this, fd);
}
void update_epoll_state() {
@@ -150,20 +153,15 @@ struct SelectorThread : AsyncIo {
bool read_changed = ctx._epoll_read != bool(ctx._reader);
bool write_changed = ctx._epoll_write != bool(ctx._writer);
if (read_changed || write_changed) {
- fprintf(stderr, "epoll update %d %s %s\n", ctx._fd,
- ctx._reader ? "read" : "-", ctx._writer ? "write" : "-");
_selector.update(ctx._fd, ctx, bool(ctx._reader), bool(ctx._writer));
}
} else {
- fprintf(stderr, "epoll add %d %s %s\n", ctx._fd,
- ctx._reader ? "read" : "-", ctx._writer ? "write" : "-");
_selector.add(ctx._fd, ctx, bool(ctx._reader), bool(ctx._writer));
}
ctx._epoll_read = bool(ctx._reader);
ctx._epoll_write = bool(ctx._writer);
} else {
if (was_added) {
- fprintf(stderr, "epoll remove %d\n", ctx._fd);
_selector.remove(ctx._fd);
}
_state.erase(pos);
@@ -171,28 +169,6 @@ struct SelectorThread : AsyncIo {
}
_check.clear();
}
- void cancel_epoll_state() {
- REQUIRE(_shutdown);
- _check.clear();
- for (auto &entry: _state) {
- FdContext &ctx = entry.second;
- const bool was_added = (ctx._epoll_read || ctx._epoll_write);
- if (was_added) {
- fprintf(stderr, "epoll remove %d (shutdown)\n", ctx._fd);
- _selector.remove(ctx._fd);
- }
- if (ctx._reader) {
- auto reader = std::exchange(ctx._reader, nullptr);
- reader.resume();
- }
- if (ctx._writer) {
- auto writer = std::exchange(ctx._writer, nullptr);
- writer.resume();
- }
- }
- _state.clear();
- REQUIRE(_check.empty());
- }
void handle_wakeup() { _check_queue = true; }
void handle_queue() {
if (!_check_queue) {
@@ -203,26 +179,18 @@ struct SelectorThread : AsyncIo {
auto guard = protect();
std::swap(_todo, _queue);
}
- fprintf(stderr, "todo list: %zu items\n", _todo.size());
for (auto &&handle: _todo) {
handle.resume();
}
_todo.clear();
}
- void force_handle_queue() {
- REQUIRE(_shutdown);
- _check_queue = true;
- handle_queue();
- }
void handle_event(FdContext &ctx, bool read, bool write) {
_check.insert(ctx._fd);
if (read && ctx._reader) {
- fprintf(stderr, "resume readable(%d)\n", ctx._fd);
auto reader = std::exchange(ctx._reader, nullptr);
reader.resume();
}
if (write && ctx._writer) {
- fprintf(stderr, "resume writable(%d)\n", ctx._fd);
auto writer = std::exchange(ctx._writer, nullptr);
writer.resume();
}
@@ -231,35 +199,32 @@ struct SelectorThread : AsyncIo {
return "selector-thread";
}
Lazy<SocketHandle> accept(ServerSocket &server_socket) override {
- fprintf(stderr, "async accept(%d)\n", server_socket.get_fd());
- bool in_my_thread = co_await enter_thread();
- if (in_my_thread) {
- co_await readable(server_socket.get_fd());
- if (!_shutdown) {
+ bool in_thread = co_await enter_thread();
+ if (in_thread) {
+ bool can_read = co_await readable(server_socket.get_fd());
+ if (can_read) {
co_return server_socket.accept();
}
}
co_return SocketHandle(-ECANCELED);
}
Lazy<SocketHandle> connect(const SocketAddress &addr) override {
- fprintf(stderr, "async connect(%s)\n", addr.spec().c_str());
- bool in_my_thread = co_await enter_thread();
- if (in_my_thread) {
+ bool in_thread = co_await enter_thread();
+ if (in_thread) {
auto tweak = [](SocketHandle &handle){ return handle.set_blocking(false); };
auto socket = addr.connect(tweak);
- co_await writable(socket.get());
- if (!_shutdown) {
+ bool can_write = co_await writable(socket.get());
+ if (can_write) {
co_return std::move(socket);
}
}
co_return SocketHandle(-ECANCELED);
}
Lazy<ssize_t> read(SocketHandle &socket, char *buf, size_t len) override {
- fprintf(stderr, "async read(%d)\n", socket.get());
- bool in_my_thread = co_await enter_thread();
- if (in_my_thread) {
- co_await readable(socket.get());
- if (!_shutdown) {
+ bool in_thread = co_await enter_thread();
+ if (in_thread) {
+ bool can_read = co_await readable(socket.get());
+ if (can_read) {
ssize_t res = socket.read(buf, len);
co_return (res < 0) ? -errno : res;
}
@@ -267,11 +232,10 @@ struct SelectorThread : AsyncIo {
co_return -ECANCELED;
}
Lazy<ssize_t> write(SocketHandle &socket, const char *buf, size_t len) override {
- fprintf(stderr, "async write(%d)\n", socket.get());
- bool in_my_thread = co_await enter_thread();
- if (in_my_thread) {
- co_await writable(socket.get());
- if (!_shutdown) {
+ bool in_thread = co_await enter_thread();
+ if (in_thread) {
+ bool can_write = co_await writable(socket.get());
+ if (can_write) {
ssize_t res = socket.write(buf, len);
co_return (res < 0) ? -errno : res;
}
@@ -282,11 +246,10 @@ struct SelectorThread : AsyncIo {
co_return co_await queue_self_unless(false);
}
Detached async_shutdown() {
- bool in_my_thread = co_await enter_thread();
- REQUIRE(in_my_thread && "unable to initialize shutdown of internal thread");
+ bool in_thread = co_await enter_thread();
+ REQUIRE(in_thread && "unable to initialize shutdown of internal thread");
{
auto guard = protect();
- _shutdown = true;
_thread_id = std::thread::id();
}
}
@@ -295,42 +258,63 @@ struct SelectorThread : AsyncIo {
void
SelectorThread::start()
{
- fprintf(stderr, "start\n");
_thread = std::thread(&SelectorThread::main, this);
_thread_id.wait(std::thread::id());
}
+struct SelectorThread::RunGuard {
+ SelectorThread &self;
+ RunGuard(SelectorThread &self_in) noexcept : self(self_in) {
+ self._thread_id = std::this_thread::get_id();
+ self._thread_id.notify_all();
+ }
+ ~RunGuard() {
+ REQUIRE(self.stopped());
+ self._check.clear();
+ for (auto &entry: self._state) {
+ FdContext &ctx = entry.second;
+ const bool was_added = (ctx._epoll_read || ctx._epoll_write);
+ if (was_added) {
+ self._selector.remove(ctx._fd);
+ }
+ if (ctx._reader) {
+ auto reader = std::exchange(ctx._reader, nullptr);
+ reader.resume();
+ }
+ if (ctx._writer) {
+ auto writer = std::exchange(ctx._writer, nullptr);
+ writer.resume();
+ }
+ }
+ self._state.clear();
+ REQUIRE(self._check.empty());
+ self._check_queue = true;
+ self.handle_queue();
+ }
+};
+
void
SelectorThread::main()
{
- _thread_id = std::this_thread::get_id();
- _thread_id.notify_all();
- while (!_shutdown) {
+ RunGuard guard(*this);
+ while (running()) {
update_epoll_state();
- fprintf(stderr, "--> epoll wait\n");
_selector.poll(1000);
- fprintf(stderr, "<-- epoll wait: got %zu events\n", _selector.num_events());
_selector.dispatch(*this);
handle_queue();
}
- fprintf(stderr, "event loop cleanup\n");
- cancel_epoll_state();
- force_handle_queue();
}
void
SelectorThread::init_shutdown()
{
- fprintf(stderr, "init_shutdown\n");
async_shutdown();
}
void
SelectorThread::fini_shutdown()
{
- fprintf(stderr, "--> fini_shutdown\n");
_thread.join();
- fprintf(stderr, "<-- fini_shutdown\n");
}
SelectorThread::~SelectorThread()