summaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2022-12-21 12:30:34 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2022-12-23 12:00:30 +0000
commit7d5ebdb57f0d79effe57ed26b54ac66ec62decac (patch)
treec86f38bd2f69c88275e16fd0dd97053c43eb5136 /vespalib
parenteb9c62eebca80aec6b80b24d167407764c97a048 (diff)
owner, shutdown and error propagation for async io
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/tests/coro/async_io/async_io_test.cpp31
-rw-r--r--vespalib/src/vespa/vespalib/coro/async_io.cpp324
-rw-r--r--vespalib/src/vespa/vespalib/coro/async_io.h36
3 files changed, 297 insertions, 94 deletions
diff --git a/vespalib/src/tests/coro/async_io/async_io_test.cpp b/vespalib/src/tests/coro/async_io/async_io_test.cpp
index 9b8a98a8f69..a506a5dd0d4 100644
--- a/vespalib/src/tests/coro/async_io/async_io_test.cpp
+++ b/vespalib/src/tests/coro/async_io/async_io_test.cpp
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/coro/lazy.h>
+#include <vespa/vespalib/coro/detached.h>
#include <vespa/vespalib/coro/completion.h>
#include <vespa/vespalib/coro/async_io.h>
#include <vespa/vespalib/net/socket_spec.h>
@@ -12,6 +13,13 @@
using namespace vespalib;
using namespace vespalib::coro;
+Detached self_exiting_run_loop(AsyncIo::SP async) {
+ for (size_t i = 0; co_await async->schedule(); ++i) {
+ fprintf(stderr, "self_exiting_run_loop -> current value: %zu\n", i);
+ }
+ fprintf(stderr, "self_exiting_run_loop -> exiting\n");
+}
+
Work run_loop(AsyncIo &async, int a, int b) {
for (int i = a; i < b; ++i) {
co_await async.schedule();
@@ -22,20 +30,29 @@ Work run_loop(AsyncIo &async, int a, int b) {
TEST(AsyncIoTest, create_async_io) {
auto async = AsyncIo::create();
- ASSERT_TRUE(async);
- fprintf(stderr, "async_io impl: %s\n", async->get_impl_spec().c_str());
+ AsyncIo &api = async;
+ fprintf(stderr, "async_io impl: %s\n", api.get_impl_spec().c_str());
}
TEST(AsyncIoTest, run_stuff_in_async_io_context) {
auto async = AsyncIo::create();
- auto f1 = make_future(run_loop(*async, 10, 20));
- auto f2 = make_future(run_loop(*async, 20, 30));
- auto f3 = make_future(run_loop(*async, 30, 40));
+ auto f1 = make_future(run_loop(async, 10, 20));
+ auto f2 = make_future(run_loop(async, 20, 30));
+ auto f3 = make_future(run_loop(async, 30, 40));
f1.wait();
f2.wait();
f3.wait();
}
+TEST(AsyncIoTest, shutdown_with_self_exiting_coroutine) {
+ auto async = AsyncIo::create();
+ auto f1 = make_future(run_loop(async, 10, 20));
+ auto f2 = make_future(run_loop(async, 20, 30));
+ self_exiting_run_loop(async.share());
+ f1.wait();
+ f2.wait();
+}
+
Lazy<size_t> write_msg(AsyncIo &async, SocketHandle &socket, const vespalib::string &msg) {
size_t written = 0;
while (written < msg.size()) {
@@ -103,8 +120,8 @@ TEST(AsyncIoTest, raw_socket_io) {
ServerSocket server_socket("tcp/0");
server_socket.set_blocking(false);
auto async = AsyncIo::create();
- auto f1 = make_future(async_server(*async, server_socket));
- auto f2 = make_future(async_client(*async, server_socket));
+ auto f1 = make_future(async_server(async, server_socket));
+ auto f2 = make_future(async_client(async, server_socket));
f1.wait();
f2.wait();
}
diff --git a/vespalib/src/vespa/vespalib/coro/async_io.cpp b/vespalib/src/vespa/vespalib/coro/async_io.cpp
index 406debbe1f9..a38092a79fd 100644
--- a/vespalib/src/vespa/vespalib/coro/async_io.cpp
+++ b/vespalib/src/vespa/vespalib/coro/async_io.cpp
@@ -16,21 +16,17 @@ namespace {
using Handle = std::coroutine_handle<>;
-template <typename F>
-struct await_void {
- bool ready;
- F on_suspend;
- await_void(bool ready_in, F on_suspend_in)
- : ready(ready_in), on_suspend(on_suspend_in) {}
- bool await_ready() const noexcept { return ready; }
- auto await_suspend(Handle handle) const { return on_suspend(handle); }
- constexpr void await_resume() noexcept {}
-};
-template <typename F>
-await_void(bool ready_in, F on_suspend_in) -> await_void<F>;
-
struct SelectorThread : AsyncIo {
+ struct awaiter_base {
+ SelectorThread &self;
+ awaiter_base(SelectorThread &self_in) noexcept : self(self_in) {}
+ awaiter_base(const awaiter_base &) = delete;
+ awaiter_base &operator=(const awaiter_base &) = delete;
+ awaiter_base(awaiter_base &&) = delete;
+ awaiter_base &operator=(awaiter_base &&) = delete;
+ };
+
struct FdContext {
int _fd;
bool _epoll_read;
@@ -42,70 +38,105 @@ struct SelectorThread : AsyncIo {
_epoll_read(false), _epoll_write(false),
_reader(nullptr), _writer(nullptr) {}
};
+ using ThreadId = std::atomic<std::thread::id>;
+
std::map<int,FdContext> _state;
- std::set<int> _check;
-
- Selector<FdContext> _selector;
- bool _shutdown;
- std::thread _thread;
- bool _check_queue;
- std::vector<Handle> _todo;
- std::mutex _lock;
- std::vector<Handle> _queue;
+ std::set<int> _check;
+ Selector<FdContext> _selector;
+ bool _shutdown;
+ std::thread _thread;
+ ThreadId _thread_id;
+ bool _check_queue;
+ std::vector<Handle> _todo;
+ std::mutex _lock;
+ std::vector<Handle> _queue;
SelectorThread()
: _state(),
_check(),
_selector(),
_shutdown(false),
- _thread(&SelectorThread::main, this),
+ _thread(),
+ _thread_id(std::thread::id()),
_check_queue(false),
_todo(),
_lock(),
- _queue() {}
+ _queue()
+ {
+ static_assert(ThreadId::is_always_lock_free);
+ }
+ void start() override;
void main();
+ void init_shutdown() override;
+ void fini_shutdown() override;
~SelectorThread();
- bool is_my_thread() const { return (std::this_thread::get_id() == _thread.get_id()); }
+ bool is_my_thread() const {
+ return (std::this_thread::get_id() == _thread_id.load(std::memory_order_relaxed));
+ }
auto protect() { return std::lock_guard(_lock); }
auto queue_self_unless(bool ready) {
- return await_void(ready,
- [this](Handle handle)
- {
- bool need_wakeup = false;
- {
- auto guard = protect();
- need_wakeup = _queue.empty();
- _queue.push_back(handle);
- }
- if (need_wakeup) {
- _selector.wakeup();
- }
- });
+ struct awaiter : awaiter_base {
+ bool ready;
+ awaiter(SelectorThread &self_in, bool ready_in) noexcept
+ : awaiter_base(self_in), ready(ready_in) {}
+ bool await_ready() const noexcept { return ready; }
+ bool await_resume() const noexcept { return self.is_my_thread(); }
+ bool await_suspend(Handle handle) __attribute__((noinline)) {
+ bool need_wakeup = false;
+ {
+ auto guard = self.protect();
+ if (self._shutdown) {
+ return false;
+ }
+ need_wakeup = self._queue.empty();
+ self._queue.push_back(handle);
+ }
+ if (need_wakeup) {
+ self._selector.wakeup();
+ }
+ return true;
+ }
+ };
+ return awaiter(*this, ready);
}
auto enter_thread() { return queue_self_unless(is_my_thread()); }
auto readable(int fd) {
+ struct awaiter : awaiter_base {
+ int fd;
+ awaiter(SelectorThread &self_in, int fd_in) noexcept
+ : awaiter_base(self_in), fd(fd_in) {}
+ bool await_ready() const noexcept { return (fd < 0) || self._shutdown; }
+ void await_resume() const noexcept {}
+ void await_suspend(Handle handle) __attribute__((noinline)) {
+ auto [pos, ignore] = self._state.try_emplace(fd, fd);
+ FdContext &state = pos->second;
+ REQUIRE(!state._reader && "conflicting reads detected");
+ state._reader = handle;
+ self._check.insert(state._fd);
+ }
+ };
+ fprintf(stderr, "await readable(%d)\n", fd);
REQUIRE(is_my_thread());
- return await_void((fd < 0),
- [this, fd](Handle handle)
- {
- auto [pos, ignore] = _state.try_emplace(fd, fd);
- FdContext &state = pos->second;
- REQUIRE(!state._reader && "conflicting reads detected");
- state._reader = handle;
- _check.insert(state._fd);
- });
+ return awaiter(*this, fd);
}
auto writable(int fd) {
+ struct awaiter : awaiter_base {
+ int fd;
+ awaiter(SelectorThread &self_in, int fd_in) noexcept
+ : awaiter_base(self_in), fd(fd_in) {}
+ bool await_ready() const noexcept { return (fd < 0) || self._shutdown; }
+ void await_resume() const noexcept {}
+ void await_suspend(Handle handle) __attribute__((noinline)) {
+ auto [pos, ignore] = self._state.try_emplace(fd, fd);
+ FdContext &state = pos->second;
+ REQUIRE(!state._writer && "conflicting reads detected");
+ state._writer = handle;
+ self._check.insert(state._fd);
+ }
+ };
+ fprintf(stderr, "await writable(%d)\n", fd);
REQUIRE(is_my_thread());
- return await_void((fd < 0),
- [this, fd](Handle handle)
- {
- auto [pos, ignore] = _state.try_emplace(fd, fd);
- FdContext &state = pos->second;
- REQUIRE(!state._writer && "conflicting write detected");
- state._writer = handle;
- _check.insert(state._fd);
- });
+ return awaiter(*this, fd);
}
void update_epoll_state() {
for (int fd: _check) {
@@ -119,15 +150,20 @@ struct SelectorThread : AsyncIo {
bool read_changed = ctx._epoll_read != bool(ctx._reader);
bool write_changed = ctx._epoll_write != bool(ctx._writer);
if (read_changed || write_changed) {
+ fprintf(stderr, "epoll update %d %s %s\n", ctx._fd,
+ ctx._reader ? "read" : "-", ctx._writer ? "write" : "-");
_selector.update(ctx._fd, ctx, bool(ctx._reader), bool(ctx._writer));
}
} else {
+ fprintf(stderr, "epoll add %d %s %s\n", ctx._fd,
+ ctx._reader ? "read" : "-", ctx._writer ? "write" : "-");
_selector.add(ctx._fd, ctx, bool(ctx._reader), bool(ctx._writer));
}
ctx._epoll_read = bool(ctx._reader);
ctx._epoll_write = bool(ctx._writer);
} else {
if (was_added) {
+ fprintf(stderr, "epoll remove %d\n", ctx._fd);
_selector.remove(ctx._fd);
}
_state.erase(pos);
@@ -135,6 +171,28 @@ struct SelectorThread : AsyncIo {
}
_check.clear();
}
+ void cancel_epoll_state() {
+ REQUIRE(_shutdown);
+ _check.clear();
+ for (auto &entry: _state) {
+ FdContext &ctx = entry.second;
+ const bool was_added = (ctx._epoll_read || ctx._epoll_write);
+ if (was_added) {
+ fprintf(stderr, "epoll remove %d (shutdown)\n", ctx._fd);
+ _selector.remove(ctx._fd);
+ }
+ if (ctx._reader) {
+ auto reader = std::exchange(ctx._reader, nullptr);
+ reader.resume();
+ }
+ if (ctx._writer) {
+ auto writer = std::exchange(ctx._writer, nullptr);
+ writer.resume();
+ }
+ }
+ _state.clear();
+ REQUIRE(_check.empty());
+ }
void handle_wakeup() { _check_queue = true; }
void handle_queue() {
if (!_check_queue) {
@@ -145,18 +203,26 @@ struct SelectorThread : AsyncIo {
auto guard = protect();
std::swap(_todo, _queue);
}
+ fprintf(stderr, "todo list: %zu items\n", _todo.size());
for (auto &&handle: _todo) {
handle.resume();
}
_todo.clear();
}
+ void force_handle_queue() {
+ REQUIRE(_shutdown);
+ _check_queue = true;
+ handle_queue();
+ }
void handle_event(FdContext &ctx, bool read, bool write) {
_check.insert(ctx._fd);
if (read && ctx._reader) {
+ fprintf(stderr, "resume readable(%d)\n", ctx._fd);
auto reader = std::exchange(ctx._reader, nullptr);
reader.resume();
}
if (write && ctx._writer) {
+ fprintf(stderr, "resume writable(%d)\n", ctx._fd);
auto writer = std::exchange(ctx._writer, nullptr);
writer.resume();
}
@@ -165,56 +231,114 @@ struct SelectorThread : AsyncIo {
return "selector-thread";
}
Lazy<SocketHandle> accept(ServerSocket &server_socket) override {
- co_await enter_thread();
- co_await readable(server_socket.get_fd());
- co_return server_socket.accept();
+ fprintf(stderr, "async accept(%d)\n", server_socket.get_fd());
+ bool in_my_thread = co_await enter_thread();
+ if (in_my_thread) {
+ co_await readable(server_socket.get_fd());
+ if (!_shutdown) {
+ co_return server_socket.accept();
+ }
+ }
+ co_return SocketHandle(-ECANCELED);
}
Lazy<SocketHandle> connect(const SocketAddress &addr) override {
- co_await enter_thread();
- auto tweak = [](SocketHandle &handle){ return handle.set_blocking(false); };
- auto socket = addr.connect(tweak);
- co_await writable(socket.get());
- co_return std::move(socket);
+ fprintf(stderr, "async connect(%s)\n", addr.spec().c_str());
+ bool in_my_thread = co_await enter_thread();
+ if (in_my_thread) {
+ auto tweak = [](SocketHandle &handle){ return handle.set_blocking(false); };
+ auto socket = addr.connect(tweak);
+ co_await writable(socket.get());
+ if (!_shutdown) {
+ co_return std::move(socket);
+ }
+ }
+ co_return SocketHandle(-ECANCELED);
}
Lazy<ssize_t> read(SocketHandle &socket, char *buf, size_t len) override {
- co_await enter_thread();
- co_await readable(socket.get());
- co_return socket.read(buf, len);
+ fprintf(stderr, "async read(%d)\n", socket.get());
+ bool in_my_thread = co_await enter_thread();
+ if (in_my_thread) {
+ co_await readable(socket.get());
+ if (!_shutdown) {
+ ssize_t res = socket.read(buf, len);
+ co_return (res < 0) ? -errno : res;
+ }
+ }
+ co_return -ECANCELED;
}
Lazy<ssize_t> write(SocketHandle &socket, const char *buf, size_t len) override {
- co_await enter_thread();
- co_await writable(socket.get());
- co_return socket.write(buf, len);
+ fprintf(stderr, "async write(%d)\n", socket.get());
+ bool in_my_thread = co_await enter_thread();
+ if (in_my_thread) {
+ co_await writable(socket.get());
+ if (!_shutdown) {
+ ssize_t res = socket.write(buf, len);
+ co_return (res < 0) ? -errno : res;
+ }
+ }
+ co_return -ECANCELED;
}
- Work schedule() override {
- co_await queue_self_unless(false);
- co_return Done{};
+ Lazy<bool> schedule() override {
+ co_return co_await queue_self_unless(false);
}
- Detached shutdown() {
- co_await enter_thread();
+ Detached async_shutdown() {
+ bool in_my_thread = co_await enter_thread();
+ REQUIRE(in_my_thread && "unable to initialize shutdown of internal thread");
{
auto guard = protect();
_shutdown = true;
+ _thread_id = std::thread::id();
}
}
};
void
+SelectorThread::start()
+{
+ fprintf(stderr, "start\n");
+ _thread = std::thread(&SelectorThread::main, this);
+ _thread_id.wait(std::thread::id());
+}
+
+void
SelectorThread::main()
{
- const int ms_timeout = 100;
+ _thread_id = std::this_thread::get_id();
+ _thread_id.notify_all();
while (!_shutdown) {
update_epoll_state();
- _selector.poll(ms_timeout);
+ fprintf(stderr, "--> epoll wait\n");
+ _selector.poll(1000);
+ fprintf(stderr, "<-- epoll wait: got %zu events\n", _selector.num_events());
_selector.dispatch(*this);
handle_queue();
}
+ fprintf(stderr, "event loop cleanup\n");
+ cancel_epoll_state();
+ force_handle_queue();
+}
+
+void
+SelectorThread::init_shutdown()
+{
+ fprintf(stderr, "init_shutdown\n");
+ async_shutdown();
}
-SelectorThread::~SelectorThread() {
- shutdown();
- REQUIRE(!is_my_thread());
+void
+SelectorThread::fini_shutdown()
+{
+ fprintf(stderr, "--> fini_shutdown\n");
_thread.join();
+ fprintf(stderr, "<-- fini_shutdown\n");
+}
+
+SelectorThread::~SelectorThread()
+{
+ REQUIRE(_state.empty());
+ REQUIRE(_check.empty());
+ REQUIRE(_todo.empty());
+ REQUIRE(_queue.empty());
}
}
@@ -222,9 +346,45 @@ SelectorThread::~SelectorThread() {
AsyncIo::~AsyncIo() = default;
AsyncIo::AsyncIo() = default;
-std::shared_ptr<AsyncIo>
+AsyncIo::Owner::Owner(std::shared_ptr<AsyncIo> async_io)
+ : _async_io(std::move(async_io)),
+ _init_shutdown_called(false),
+ _fini_shutdown_called(false)
+{
+ _async_io->start();
+}
+
+void
+AsyncIo::Owner::init_shutdown()
+{
+ if (!_init_shutdown_called) {
+ if (_async_io) {
+ _async_io->init_shutdown();
+ }
+ _init_shutdown_called = true;
+ }
+}
+
+void
+AsyncIo::Owner::fini_shutdown()
+{
+ if (!_fini_shutdown_called) {
+ init_shutdown();
+ if (_async_io) {
+ _async_io->fini_shutdown();
+ }
+ _fini_shutdown_called = true;
+ }
+}
+
+AsyncIo::Owner::~Owner()
+{
+ fini_shutdown();
+}
+
+AsyncIo::Owner
AsyncIo::create() {
- return std::make_shared<SelectorThread>();
+ return Owner(std::make_shared<SelectorThread>());
}
}
diff --git a/vespalib/src/vespa/vespalib/coro/async_io.h b/vespalib/src/vespa/vespalib/coro/async_io.h
index 91449780789..72e2ef3a312 100644
--- a/vespalib/src/vespa/vespalib/coro/async_io.h
+++ b/vespalib/src/vespa/vespalib/coro/async_io.h
@@ -11,7 +11,7 @@
namespace vespalib::coro {
-// Interfaces defining functions used to perform async io. The initial
+// Interface defining functions used to perform async io. The initial
// implementation will perform epoll in a single dedicated thread. The
// idea is to be able to switch to an implementation using io_uring
// some time in the future without having to change existing client
@@ -24,9 +24,29 @@ struct AsyncIo : std::enable_shared_from_this<AsyncIo> {
AsyncIo &operator=(const AsyncIo &) = delete;
AsyncIo &operator=(AsyncIo &&) = delete;
virtual ~AsyncIo();
-
- // create an async_io 'runtime' with the default implementation
- static std::shared_ptr<AsyncIo> create();
+ using SP = std::shared_ptr<AsyncIo>;
+
+ // thin wrapper used by the owner to handle lifetime
+ class Owner {
+ private:
+ std::shared_ptr<AsyncIo> _async_io;
+ bool _init_shutdown_called;
+ bool _fini_shutdown_called;
+ public:
+ Owner(std::shared_ptr<AsyncIo> async_io);
+ Owner(const Owner &) = delete;
+ Owner &operator=(const Owner &) = delete;
+ Owner(Owner &&) = default;
+ Owner &operator=(Owner &&) = default;
+ AsyncIo::SP share() { return _async_io->shared_from_this(); }
+ operator AsyncIo &() { return *_async_io; }
+ void init_shutdown();
+ void fini_shutdown();
+ ~Owner();
+ };
+
+ // create an async_io 'runtime'
+ static Owner create();
// implementation tag
virtual vespalib::string get_impl_spec() = 0;
@@ -36,11 +56,17 @@ struct AsyncIo : std::enable_shared_from_this<AsyncIo> {
virtual Lazy<SocketHandle> connect(const SocketAddress &addr) = 0;
virtual Lazy<ssize_t> read(SocketHandle &handle, char *buf, size_t len) = 0;
virtual Lazy<ssize_t> write(SocketHandle &handle, const char *buf, size_t len) = 0;
- virtual Work schedule() = 0;
+ virtual Lazy<bool> schedule() = 0;
protected:
// may only be created via subclass
AsyncIo();
+
+private:
+ // called by Owner
+ virtual void start() = 0;
+ virtual void init_shutdown() = 0;
+ virtual void fini_shutdown() = 0;
};
}