diff options
author | Håvard Pettersen <havardpe@yahooinc.com> | 2022-12-05 11:14:10 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@yahooinc.com> | 2022-12-19 10:59:33 +0000 |
commit | 40364090a3642888db15c9e2e7539a5098d24574 (patch) | |
tree | 53325ee1498b5c31df6f3d620029939cca5c3db1 /vespalib | |
parent | 7b265f56441a2c6107e01e78082d3f1f26883b0b (diff) |
async io experiments
Diffstat (limited to 'vespalib')
-rw-r--r-- | vespalib/CMakeLists.txt | 1 | ||||
-rw-r--r-- | vespalib/src/tests/coro/async_io/CMakeLists.txt | 9 | ||||
-rw-r--r-- | vespalib/src/tests/coro/async_io/async_io_test.cpp | 112 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/CMakeLists.txt | 1 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/coro/CMakeLists.txt | 1 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/coro/async_io.cpp | 230 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/coro/async_io.h | 46 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/coro/lazy.h | 4 |
8 files changed, 404 insertions, 0 deletions
diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt index 50cbe8879b5..a3d5054973f 100644 --- a/vespalib/CMakeLists.txt +++ b/vespalib/CMakeLists.txt @@ -44,6 +44,7 @@ vespa_define_module( src/tests/component src/tests/compress src/tests/compression + src/tests/coro/async_io src/tests/coro/detached src/tests/coro/generator src/tests/coro/lazy diff --git a/vespalib/src/tests/coro/async_io/CMakeLists.txt b/vespalib/src/tests/coro/async_io/CMakeLists.txt new file mode 100644 index 00000000000..25274198e9a --- /dev/null +++ b/vespalib/src/tests/coro/async_io/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_async_io_test_app TEST + SOURCES + async_io_test.cpp + DEPENDS + vespalib + GTest::GTest +) +vespa_add_test(NAME vespalib_async_io_test_app COMMAND vespalib_async_io_test_app) diff --git a/vespalib/src/tests/coro/async_io/async_io_test.cpp b/vespalib/src/tests/coro/async_io/async_io_test.cpp new file mode 100644 index 00000000000..9b8a98a8f69 --- /dev/null +++ b/vespalib/src/tests/coro/async_io/async_io_test.cpp @@ -0,0 +1,112 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/coro/lazy.h> +#include <vespa/vespalib/coro/completion.h> +#include <vespa/vespalib/coro/async_io.h> +#include <vespa/vespalib/net/socket_spec.h> +#include <vespa/vespalib/net/server_socket.h> +#include <vespa/vespalib/net/socket_handle.h> +#include <vespa/vespalib/net/socket_address.h> +#include <vespa/vespalib/gtest/gtest.h> + +using namespace vespalib; +using namespace vespalib::coro; + +Work run_loop(AsyncIo &async, int a, int b) { + for (int i = a; i < b; ++i) { + co_await async.schedule(); + fprintf(stderr, "run_loop [%d,%d> -> current value: %d\n", a, b, i); + } + co_return Done{}; +} + +TEST(AsyncIoTest, create_async_io) { + auto async = AsyncIo::create(); + ASSERT_TRUE(async); + fprintf(stderr, "async_io impl: %s\n", async->get_impl_spec().c_str()); +} + +TEST(AsyncIoTest, run_stuff_in_async_io_context) { + auto async = AsyncIo::create(); + auto f1 = make_future(run_loop(*async, 10, 20)); + auto f2 = make_future(run_loop(*async, 20, 30)); + auto f3 = make_future(run_loop(*async, 30, 40)); + f1.wait(); + f2.wait(); + f3.wait(); +} + +Lazy<size_t> write_msg(AsyncIo &async, SocketHandle &socket, const vespalib::string &msg) { + size_t written = 0; + while (written < msg.size()) { + size_t write_size = (msg.size() - written); + ssize_t write_result = co_await async.write(socket, msg.data() + written, write_size); + if (write_result <= 0) { + co_return written; + } + written += write_result; + } + co_return written; +} + +Lazy<vespalib::string> read_msg(AsyncIo &async, SocketHandle &socket, size_t wanted_bytes) { + char tmp[64]; + vespalib::string result; + while (result.size() < wanted_bytes) { + size_t read_size = std::min(sizeof(tmp), wanted_bytes - result.size()); + ssize_t read_result = co_await async.read(socket, tmp, read_size); + if (read_result <= 0) { + co_return result; + } + result.append(tmp, read_result); + } + co_return result; +} + +Work verify_socket_io(AsyncIo &async, SocketHandle &socket, bool is_server) { + vespalib::string server_message = "hello, this is the server speaking"; + vespalib::string client_message = "please pick up, I need to talk to you"; + if (is_server) { + vespalib::string read = co_await read_msg(async, socket, client_message.size()); + EXPECT_EQ(client_message, read); + size_t written = co_await write_msg(async, socket, server_message); + EXPECT_EQ(written, ssize_t(server_message.size())); + } else { + size_t written = co_await write_msg(async, socket, client_message); + EXPECT_EQ(written, ssize_t(client_message.size())); + vespalib::string read = co_await read_msg(async, socket, server_message.size()); + EXPECT_EQ(server_message, read); + } + co_return Done{}; +} + +Work async_server(AsyncIo &async, ServerSocket &server_socket) { + auto server_addr = server_socket.address(); + auto server_spec = server_addr.spec(); + fprintf(stderr, "listening at '%s' (fd = %d)\n", server_spec.c_str(), server_socket.get_fd()); + auto socket = co_await async.accept(server_socket); + fprintf(stderr, "server fd: %d\n", socket.get()); + co_return co_await verify_socket_io(async, socket, true); +} + +Work async_client(AsyncIo &async, ServerSocket &server_socket) { + auto server_addr = server_socket.address(); + auto server_spec = server_addr.spec(); + fprintf(stderr, "connecting to '%s'\n", server_spec.c_str()); + auto client_addr = SocketSpec(server_spec).client_address(); + auto socket = co_await async.connect(client_addr); + fprintf(stderr, "client fd: %d\n", socket.get()); + co_return co_await verify_socket_io(async, socket, false); +} + +TEST(AsyncIoTest, raw_socket_io) { + ServerSocket server_socket("tcp/0"); + server_socket.set_blocking(false); + auto async = AsyncIo::create(); + auto f1 = make_future(async_server(*async, server_socket)); + auto f2 = make_future(async_client(*async, server_socket)); + f1.wait(); + f2.wait(); +} + +GTEST_MAIN_RUN_ALL_TESTS() diff --git a/vespalib/src/vespa/vespalib/CMakeLists.txt b/vespalib/src/vespa/vespalib/CMakeLists.txt index 024d61b89e5..9e31084c067 100644 --- a/vespalib/src/vespa/vespalib/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/CMakeLists.txt @@ -3,6 +3,7 @@ vespa_add_library(vespalib SOURCES $<TARGET_OBJECTS:vespalib_vespalib_btree> $<TARGET_OBJECTS:vespalib_vespalib_component> + $<TARGET_OBJECTS:vespalib_vespalib_coro> $<TARGET_OBJECTS:vespalib_vespalib_crypto> $<TARGET_OBJECTS:vespalib_vespalib_data> $<TARGET_OBJECTS:vespalib_vespalib_data_slime> diff --git a/vespalib/src/vespa/vespalib/coro/CMakeLists.txt b/vespalib/src/vespa/vespalib/coro/CMakeLists.txt index d190c2e8ddc..0fbb94e8255 100644 --- a/vespalib/src/vespa/vespalib/coro/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/coro/CMakeLists.txt @@ -1,5 +1,6 @@ # Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(vespalib_vespalib_coro OBJECT SOURCES + async_io.cpp DEPENDS ) diff --git a/vespalib/src/vespa/vespalib/coro/async_io.cpp b/vespalib/src/vespa/vespalib/coro/async_io.cpp new file mode 100644 index 00000000000..406debbe1f9 --- /dev/null +++ b/vespalib/src/vespa/vespalib/coro/async_io.cpp @@ -0,0 +1,230 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "async_io.h" +#include "detached.h" +#include <vespa/vespalib/net/selector.h> +#include <vespa/vespalib/util/require.h> + +#include <atomic> +#include <vector> +#include <map> +#include <set> + +namespace vespalib::coro { + +namespace { + +using Handle = std::coroutine_handle<>; + +template <typename F> +struct await_void { + bool ready; + F on_suspend; + await_void(bool ready_in, F on_suspend_in) + : ready(ready_in), on_suspend(on_suspend_in) {} + bool await_ready() const noexcept { return ready; } + auto await_suspend(Handle handle) const { return on_suspend(handle); } + constexpr void await_resume() noexcept {} +}; +template <typename F> +await_void(bool ready_in, F on_suspend_in) -> await_void<F>; + +struct SelectorThread : AsyncIo { + + struct FdContext { + int _fd; + bool _epoll_read; + bool _epoll_write; + Handle _reader; + Handle _writer; + FdContext(int fd_in) + : _fd(fd_in), + _epoll_read(false), _epoll_write(false), + _reader(nullptr), _writer(nullptr) {} + }; + std::map<int,FdContext> _state; + std::set<int> _check; + + Selector<FdContext> _selector; + bool _shutdown; + std::thread _thread; + bool _check_queue; + std::vector<Handle> _todo; + std::mutex _lock; + std::vector<Handle> _queue; + + SelectorThread() + : _state(), + _check(), + _selector(), + _shutdown(false), + _thread(&SelectorThread::main, this), + _check_queue(false), + _todo(), + _lock(), + _queue() {} + void main(); + ~SelectorThread(); + bool is_my_thread() const { return (std::this_thread::get_id() == _thread.get_id()); } + auto protect() { return std::lock_guard(_lock); } + auto queue_self_unless(bool ready) { + return await_void(ready, + [this](Handle handle) + { + bool need_wakeup = false; + { + auto guard = protect(); + need_wakeup = _queue.empty(); + _queue.push_back(handle); + } + if (need_wakeup) { + _selector.wakeup(); + } + }); + } + auto enter_thread() { return queue_self_unless(is_my_thread()); } + auto readable(int fd) { + REQUIRE(is_my_thread()); + return await_void((fd < 0), + [this, fd](Handle handle) + { + auto [pos, ignore] = _state.try_emplace(fd, fd); + FdContext &state = pos->second; + REQUIRE(!state._reader && "conflicting reads detected"); + state._reader = handle; + _check.insert(state._fd); + }); + } + auto writable(int fd) { + REQUIRE(is_my_thread()); + return await_void((fd < 0), + [this, fd](Handle handle) + { + auto [pos, ignore] = _state.try_emplace(fd, fd); + FdContext &state = pos->second; + REQUIRE(!state._writer && "conflicting write detected"); + state._writer = handle; + _check.insert(state._fd); + }); + } + void update_epoll_state() { + for (int fd: _check) { + auto pos = _state.find(fd); + REQUIRE(pos != _state.end()); + FdContext &ctx = pos->second; + const bool keep_entry = (ctx._reader || ctx._writer); + const bool was_added = (ctx._epoll_read || ctx._epoll_write); + if (keep_entry) { + if (was_added) { + bool read_changed = ctx._epoll_read != bool(ctx._reader); + bool write_changed = ctx._epoll_write != bool(ctx._writer); + if (read_changed || write_changed) { + _selector.update(ctx._fd, ctx, bool(ctx._reader), bool(ctx._writer)); + } + } else { + _selector.add(ctx._fd, ctx, bool(ctx._reader), bool(ctx._writer)); + } + ctx._epoll_read = bool(ctx._reader); + ctx._epoll_write = bool(ctx._writer); + } else { + if (was_added) { + _selector.remove(ctx._fd); + } + _state.erase(pos); + } + } + _check.clear(); + } + void handle_wakeup() { _check_queue = true; } + void handle_queue() { + if (!_check_queue) { + return; + } + _check_queue = false; + { + auto guard = protect(); + std::swap(_todo, _queue); + } + for (auto &&handle: _todo) { + handle.resume(); + } + _todo.clear(); + } + void handle_event(FdContext &ctx, bool read, bool write) { + _check.insert(ctx._fd); + if (read && ctx._reader) { + auto reader = std::exchange(ctx._reader, nullptr); + reader.resume(); + } + if (write && ctx._writer) { + auto writer = std::exchange(ctx._writer, nullptr); + writer.resume(); + } + } + vespalib::string get_impl_spec() override { + return "selector-thread"; + } + Lazy<SocketHandle> accept(ServerSocket &server_socket) override { + co_await enter_thread(); + co_await readable(server_socket.get_fd()); + co_return server_socket.accept(); + } + Lazy<SocketHandle> connect(const SocketAddress &addr) override { + co_await enter_thread(); + auto tweak = [](SocketHandle &handle){ return handle.set_blocking(false); }; + auto socket = addr.connect(tweak); + co_await writable(socket.get()); + co_return std::move(socket); + } + Lazy<ssize_t> read(SocketHandle &socket, char *buf, size_t len) override { + co_await enter_thread(); + co_await readable(socket.get()); + co_return socket.read(buf, len); + } + Lazy<ssize_t> write(SocketHandle &socket, const char *buf, size_t len) override { + co_await enter_thread(); + co_await writable(socket.get()); + co_return socket.write(buf, len); + } + Work schedule() override { + co_await queue_self_unless(false); + co_return Done{}; + } + Detached shutdown() { + co_await enter_thread(); + { + auto guard = protect(); + _shutdown = true; + } + } +}; + +void +SelectorThread::main() +{ + const int ms_timeout = 100; + while (!_shutdown) { + update_epoll_state(); + _selector.poll(ms_timeout); + _selector.dispatch(*this); + handle_queue(); + } +} + +SelectorThread::~SelectorThread() { + shutdown(); + REQUIRE(!is_my_thread()); + _thread.join(); +} + +} + +AsyncIo::~AsyncIo() = default; +AsyncIo::AsyncIo() = default; + +std::shared_ptr<AsyncIo> +AsyncIo::create() { + return std::make_shared<SelectorThread>(); +} + +} diff --git a/vespalib/src/vespa/vespalib/coro/async_io.h b/vespalib/src/vespa/vespalib/coro/async_io.h new file mode 100644 index 00000000000..91449780789 --- /dev/null +++ b/vespalib/src/vespa/vespalib/coro/async_io.h @@ -0,0 +1,46 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "lazy.h" + +#include <memory> +#include <vespa/vespalib/stllike/string.h> +#include <vespa/vespalib/net/socket_handle.h> +#include <vespa/vespalib/net/server_socket.h> + +namespace vespalib::coro { + +// Interfaces defining functions used to perform async io. The initial +// implementation will perform epoll in a single dedicated thread. The +// idea is to be able to switch to an implementation using io_uring +// some time in the future without having to change existing client +// code. + +struct AsyncIo : std::enable_shared_from_this<AsyncIo> { + // these objects should not be copied around + AsyncIo(const AsyncIo &) = delete; + AsyncIo(AsyncIo &&) = delete; + AsyncIo &operator=(const AsyncIo &) = delete; + AsyncIo &operator=(AsyncIo &&) = delete; + virtual ~AsyncIo(); + + // create an async_io 'runtime' with the default implementation + static std::shared_ptr<AsyncIo> create(); + + // implementation tag + virtual vespalib::string get_impl_spec() = 0; + + // api for async io used by coroutines + virtual Lazy<SocketHandle> accept(ServerSocket &server_socket) = 0; + virtual Lazy<SocketHandle> connect(const SocketAddress &addr) = 0; + virtual Lazy<ssize_t> read(SocketHandle &handle, char *buf, size_t len) = 0; + virtual Lazy<ssize_t> write(SocketHandle &handle, const char *buf, size_t len) = 0; + virtual Work schedule() = 0; + +protected: + // may only be created via subclass + AsyncIo(); +}; + +} diff --git a/vespalib/src/vespa/vespalib/coro/lazy.h b/vespalib/src/vespa/vespalib/coro/lazy.h index 2ff6c65ca34..974968d0c77 100644 --- a/vespalib/src/vespa/vespalib/coro/lazy.h +++ b/vespalib/src/vespa/vespalib/coro/lazy.h @@ -98,4 +98,8 @@ public: template<std::movable T> Lazy<T>::promise_type::~promise_type() = default; +// signal the completion of work without any result value +struct Done {}; +using Work = Lazy<Done>; + } |