aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2022-12-05 11:14:10 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2022-12-19 10:59:33 +0000
commit40364090a3642888db15c9e2e7539a5098d24574 (patch)
tree53325ee1498b5c31df6f3d620029939cca5c3db1 /vespalib
parent7b265f56441a2c6107e01e78082d3f1f26883b0b (diff)
async io experiments
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/CMakeLists.txt1
-rw-r--r--vespalib/src/tests/coro/async_io/CMakeLists.txt9
-rw-r--r--vespalib/src/tests/coro/async_io/async_io_test.cpp112
-rw-r--r--vespalib/src/vespa/vespalib/CMakeLists.txt1
-rw-r--r--vespalib/src/vespa/vespalib/coro/CMakeLists.txt1
-rw-r--r--vespalib/src/vespa/vespalib/coro/async_io.cpp230
-rw-r--r--vespalib/src/vespa/vespalib/coro/async_io.h46
-rw-r--r--vespalib/src/vespa/vespalib/coro/lazy.h4
8 files changed, 404 insertions, 0 deletions
diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt
index 50cbe8879b5..a3d5054973f 100644
--- a/vespalib/CMakeLists.txt
+++ b/vespalib/CMakeLists.txt
@@ -44,6 +44,7 @@ vespa_define_module(
src/tests/component
src/tests/compress
src/tests/compression
+ src/tests/coro/async_io
src/tests/coro/detached
src/tests/coro/generator
src/tests/coro/lazy
diff --git a/vespalib/src/tests/coro/async_io/CMakeLists.txt b/vespalib/src/tests/coro/async_io/CMakeLists.txt
new file mode 100644
index 00000000000..25274198e9a
--- /dev/null
+++ b/vespalib/src/tests/coro/async_io/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(vespalib_async_io_test_app TEST
+ SOURCES
+ async_io_test.cpp
+ DEPENDS
+ vespalib
+ GTest::GTest
+)
+vespa_add_test(NAME vespalib_async_io_test_app COMMAND vespalib_async_io_test_app)
diff --git a/vespalib/src/tests/coro/async_io/async_io_test.cpp b/vespalib/src/tests/coro/async_io/async_io_test.cpp
new file mode 100644
index 00000000000..9b8a98a8f69
--- /dev/null
+++ b/vespalib/src/tests/coro/async_io/async_io_test.cpp
@@ -0,0 +1,112 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/coro/lazy.h>
+#include <vespa/vespalib/coro/completion.h>
+#include <vespa/vespalib/coro/async_io.h>
+#include <vespa/vespalib/net/socket_spec.h>
+#include <vespa/vespalib/net/server_socket.h>
+#include <vespa/vespalib/net/socket_handle.h>
+#include <vespa/vespalib/net/socket_address.h>
+#include <vespa/vespalib/gtest/gtest.h>
+
+using namespace vespalib;
+using namespace vespalib::coro;
+
+Work run_loop(AsyncIo &async, int a, int b) {
+ for (int i = a; i < b; ++i) {
+ co_await async.schedule();
+ fprintf(stderr, "run_loop [%d,%d> -> current value: %d\n", a, b, i);
+ }
+ co_return Done{};
+}
+
+TEST(AsyncIoTest, create_async_io) {
+ auto async = AsyncIo::create();
+ ASSERT_TRUE(async);
+ fprintf(stderr, "async_io impl: %s\n", async->get_impl_spec().c_str());
+}
+
+TEST(AsyncIoTest, run_stuff_in_async_io_context) {
+ auto async = AsyncIo::create();
+ auto f1 = make_future(run_loop(*async, 10, 20));
+ auto f2 = make_future(run_loop(*async, 20, 30));
+ auto f3 = make_future(run_loop(*async, 30, 40));
+ f1.wait();
+ f2.wait();
+ f3.wait();
+}
+
+Lazy<size_t> write_msg(AsyncIo &async, SocketHandle &socket, const vespalib::string &msg) {
+ size_t written = 0;
+ while (written < msg.size()) {
+ size_t write_size = (msg.size() - written);
+ ssize_t write_result = co_await async.write(socket, msg.data() + written, write_size);
+ if (write_result <= 0) {
+ co_return written;
+ }
+ written += write_result;
+ }
+ co_return written;
+}
+
+Lazy<vespalib::string> read_msg(AsyncIo &async, SocketHandle &socket, size_t wanted_bytes) {
+ char tmp[64];
+ vespalib::string result;
+ while (result.size() < wanted_bytes) {
+ size_t read_size = std::min(sizeof(tmp), wanted_bytes - result.size());
+ ssize_t read_result = co_await async.read(socket, tmp, read_size);
+ if (read_result <= 0) {
+ co_return result;
+ }
+ result.append(tmp, read_result);
+ }
+ co_return result;
+}
+
+Work verify_socket_io(AsyncIo &async, SocketHandle &socket, bool is_server) {
+ vespalib::string server_message = "hello, this is the server speaking";
+ vespalib::string client_message = "please pick up, I need to talk to you";
+ if (is_server) {
+ vespalib::string read = co_await read_msg(async, socket, client_message.size());
+ EXPECT_EQ(client_message, read);
+ size_t written = co_await write_msg(async, socket, server_message);
+ EXPECT_EQ(written, ssize_t(server_message.size()));
+ } else {
+ size_t written = co_await write_msg(async, socket, client_message);
+ EXPECT_EQ(written, ssize_t(client_message.size()));
+ vespalib::string read = co_await read_msg(async, socket, server_message.size());
+ EXPECT_EQ(server_message, read);
+ }
+ co_return Done{};
+}
+
+Work async_server(AsyncIo &async, ServerSocket &server_socket) {
+ auto server_addr = server_socket.address();
+ auto server_spec = server_addr.spec();
+ fprintf(stderr, "listening at '%s' (fd = %d)\n", server_spec.c_str(), server_socket.get_fd());
+ auto socket = co_await async.accept(server_socket);
+ fprintf(stderr, "server fd: %d\n", socket.get());
+ co_return co_await verify_socket_io(async, socket, true);
+}
+
+Work async_client(AsyncIo &async, ServerSocket &server_socket) {
+ auto server_addr = server_socket.address();
+ auto server_spec = server_addr.spec();
+ fprintf(stderr, "connecting to '%s'\n", server_spec.c_str());
+ auto client_addr = SocketSpec(server_spec).client_address();
+ auto socket = co_await async.connect(client_addr);
+ fprintf(stderr, "client fd: %d\n", socket.get());
+ co_return co_await verify_socket_io(async, socket, false);
+}
+
+TEST(AsyncIoTest, raw_socket_io) {
+ ServerSocket server_socket("tcp/0");
+ server_socket.set_blocking(false);
+ auto async = AsyncIo::create();
+ auto f1 = make_future(async_server(*async, server_socket));
+ auto f2 = make_future(async_client(*async, server_socket));
+ f1.wait();
+ f2.wait();
+}
+
+GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/vespalib/src/vespa/vespalib/CMakeLists.txt b/vespalib/src/vespa/vespalib/CMakeLists.txt
index 024d61b89e5..9e31084c067 100644
--- a/vespalib/src/vespa/vespalib/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/CMakeLists.txt
@@ -3,6 +3,7 @@ vespa_add_library(vespalib
SOURCES
$<TARGET_OBJECTS:vespalib_vespalib_btree>
$<TARGET_OBJECTS:vespalib_vespalib_component>
+ $<TARGET_OBJECTS:vespalib_vespalib_coro>
$<TARGET_OBJECTS:vespalib_vespalib_crypto>
$<TARGET_OBJECTS:vespalib_vespalib_data>
$<TARGET_OBJECTS:vespalib_vespalib_data_slime>
diff --git a/vespalib/src/vespa/vespalib/coro/CMakeLists.txt b/vespalib/src/vespa/vespalib/coro/CMakeLists.txt
index d190c2e8ddc..0fbb94e8255 100644
--- a/vespalib/src/vespa/vespalib/coro/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/coro/CMakeLists.txt
@@ -1,5 +1,6 @@
# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(vespalib_vespalib_coro OBJECT
SOURCES
+ async_io.cpp
DEPENDS
)
diff --git a/vespalib/src/vespa/vespalib/coro/async_io.cpp b/vespalib/src/vespa/vespalib/coro/async_io.cpp
new file mode 100644
index 00000000000..406debbe1f9
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/coro/async_io.cpp
@@ -0,0 +1,230 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "async_io.h"
+#include "detached.h"
+#include <vespa/vespalib/net/selector.h>
+#include <vespa/vespalib/util/require.h>
+
+#include <atomic>
+#include <vector>
+#include <map>
+#include <set>
+
+namespace vespalib::coro {
+
+namespace {
+
+using Handle = std::coroutine_handle<>;
+
+template <typename F>
+struct await_void {
+ bool ready;
+ F on_suspend;
+ await_void(bool ready_in, F on_suspend_in)
+ : ready(ready_in), on_suspend(on_suspend_in) {}
+ bool await_ready() const noexcept { return ready; }
+ auto await_suspend(Handle handle) const { return on_suspend(handle); }
+ constexpr void await_resume() noexcept {}
+};
+template <typename F>
+await_void(bool ready_in, F on_suspend_in) -> await_void<F>;
+
+struct SelectorThread : AsyncIo {
+
+ struct FdContext {
+ int _fd;
+ bool _epoll_read;
+ bool _epoll_write;
+ Handle _reader;
+ Handle _writer;
+ FdContext(int fd_in)
+ : _fd(fd_in),
+ _epoll_read(false), _epoll_write(false),
+ _reader(nullptr), _writer(nullptr) {}
+ };
+ std::map<int,FdContext> _state;
+ std::set<int> _check;
+
+ Selector<FdContext> _selector;
+ bool _shutdown;
+ std::thread _thread;
+ bool _check_queue;
+ std::vector<Handle> _todo;
+ std::mutex _lock;
+ std::vector<Handle> _queue;
+
+ SelectorThread()
+ : _state(),
+ _check(),
+ _selector(),
+ _shutdown(false),
+ _thread(&SelectorThread::main, this),
+ _check_queue(false),
+ _todo(),
+ _lock(),
+ _queue() {}
+ void main();
+ ~SelectorThread();
+ bool is_my_thread() const { return (std::this_thread::get_id() == _thread.get_id()); }
+ auto protect() { return std::lock_guard(_lock); }
+ auto queue_self_unless(bool ready) {
+ return await_void(ready,
+ [this](Handle handle)
+ {
+ bool need_wakeup = false;
+ {
+ auto guard = protect();
+ need_wakeup = _queue.empty();
+ _queue.push_back(handle);
+ }
+ if (need_wakeup) {
+ _selector.wakeup();
+ }
+ });
+ }
+ auto enter_thread() { return queue_self_unless(is_my_thread()); }
+ auto readable(int fd) {
+ REQUIRE(is_my_thread());
+ return await_void((fd < 0),
+ [this, fd](Handle handle)
+ {
+ auto [pos, ignore] = _state.try_emplace(fd, fd);
+ FdContext &state = pos->second;
+ REQUIRE(!state._reader && "conflicting reads detected");
+ state._reader = handle;
+ _check.insert(state._fd);
+ });
+ }
+ auto writable(int fd) {
+ REQUIRE(is_my_thread());
+ return await_void((fd < 0),
+ [this, fd](Handle handle)
+ {
+ auto [pos, ignore] = _state.try_emplace(fd, fd);
+ FdContext &state = pos->second;
+ REQUIRE(!state._writer && "conflicting write detected");
+ state._writer = handle;
+ _check.insert(state._fd);
+ });
+ }
+ void update_epoll_state() {
+ for (int fd: _check) {
+ auto pos = _state.find(fd);
+ REQUIRE(pos != _state.end());
+ FdContext &ctx = pos->second;
+ const bool keep_entry = (ctx._reader || ctx._writer);
+ const bool was_added = (ctx._epoll_read || ctx._epoll_write);
+ if (keep_entry) {
+ if (was_added) {
+ bool read_changed = ctx._epoll_read != bool(ctx._reader);
+ bool write_changed = ctx._epoll_write != bool(ctx._writer);
+ if (read_changed || write_changed) {
+ _selector.update(ctx._fd, ctx, bool(ctx._reader), bool(ctx._writer));
+ }
+ } else {
+ _selector.add(ctx._fd, ctx, bool(ctx._reader), bool(ctx._writer));
+ }
+ ctx._epoll_read = bool(ctx._reader);
+ ctx._epoll_write = bool(ctx._writer);
+ } else {
+ if (was_added) {
+ _selector.remove(ctx._fd);
+ }
+ _state.erase(pos);
+ }
+ }
+ _check.clear();
+ }
+ void handle_wakeup() { _check_queue = true; }
+ void handle_queue() {
+ if (!_check_queue) {
+ return;
+ }
+ _check_queue = false;
+ {
+ auto guard = protect();
+ std::swap(_todo, _queue);
+ }
+ for (auto &&handle: _todo) {
+ handle.resume();
+ }
+ _todo.clear();
+ }
+ void handle_event(FdContext &ctx, bool read, bool write) {
+ _check.insert(ctx._fd);
+ if (read && ctx._reader) {
+ auto reader = std::exchange(ctx._reader, nullptr);
+ reader.resume();
+ }
+ if (write && ctx._writer) {
+ auto writer = std::exchange(ctx._writer, nullptr);
+ writer.resume();
+ }
+ }
+ vespalib::string get_impl_spec() override {
+ return "selector-thread";
+ }
+ Lazy<SocketHandle> accept(ServerSocket &server_socket) override {
+ co_await enter_thread();
+ co_await readable(server_socket.get_fd());
+ co_return server_socket.accept();
+ }
+ Lazy<SocketHandle> connect(const SocketAddress &addr) override {
+ co_await enter_thread();
+ auto tweak = [](SocketHandle &handle){ return handle.set_blocking(false); };
+ auto socket = addr.connect(tweak);
+ co_await writable(socket.get());
+ co_return std::move(socket);
+ }
+ Lazy<ssize_t> read(SocketHandle &socket, char *buf, size_t len) override {
+ co_await enter_thread();
+ co_await readable(socket.get());
+ co_return socket.read(buf, len);
+ }
+ Lazy<ssize_t> write(SocketHandle &socket, const char *buf, size_t len) override {
+ co_await enter_thread();
+ co_await writable(socket.get());
+ co_return socket.write(buf, len);
+ }
+ Work schedule() override {
+ co_await queue_self_unless(false);
+ co_return Done{};
+ }
+ Detached shutdown() {
+ co_await enter_thread();
+ {
+ auto guard = protect();
+ _shutdown = true;
+ }
+ }
+};
+
+void
+SelectorThread::main()
+{
+ const int ms_timeout = 100;
+ while (!_shutdown) {
+ update_epoll_state();
+ _selector.poll(ms_timeout);
+ _selector.dispatch(*this);
+ handle_queue();
+ }
+}
+
+SelectorThread::~SelectorThread() {
+ shutdown();
+ REQUIRE(!is_my_thread());
+ _thread.join();
+}
+
+}
+
+AsyncIo::~AsyncIo() = default;
+AsyncIo::AsyncIo() = default;
+
+std::shared_ptr<AsyncIo>
+AsyncIo::create() {
+ return std::make_shared<SelectorThread>();
+}
+
+}
diff --git a/vespalib/src/vespa/vespalib/coro/async_io.h b/vespalib/src/vespa/vespalib/coro/async_io.h
new file mode 100644
index 00000000000..91449780789
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/coro/async_io.h
@@ -0,0 +1,46 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "lazy.h"
+
+#include <memory>
+#include <vespa/vespalib/stllike/string.h>
+#include <vespa/vespalib/net/socket_handle.h>
+#include <vespa/vespalib/net/server_socket.h>
+
+namespace vespalib::coro {
+
+// Interfaces defining functions used to perform async io. The initial
+// implementation will perform epoll in a single dedicated thread. The
+// idea is to be able to switch to an implementation using io_uring
+// some time in the future without having to change existing client
+// code.
+
+struct AsyncIo : std::enable_shared_from_this<AsyncIo> {
+ // these objects should not be copied around
+ AsyncIo(const AsyncIo &) = delete;
+ AsyncIo(AsyncIo &&) = delete;
+ AsyncIo &operator=(const AsyncIo &) = delete;
+ AsyncIo &operator=(AsyncIo &&) = delete;
+ virtual ~AsyncIo();
+
+ // create an async_io 'runtime' with the default implementation
+ static std::shared_ptr<AsyncIo> create();
+
+ // implementation tag
+ virtual vespalib::string get_impl_spec() = 0;
+
+ // api for async io used by coroutines
+ virtual Lazy<SocketHandle> accept(ServerSocket &server_socket) = 0;
+ virtual Lazy<SocketHandle> connect(const SocketAddress &addr) = 0;
+ virtual Lazy<ssize_t> read(SocketHandle &handle, char *buf, size_t len) = 0;
+ virtual Lazy<ssize_t> write(SocketHandle &handle, const char *buf, size_t len) = 0;
+ virtual Work schedule() = 0;
+
+protected:
+ // may only be created via subclass
+ AsyncIo();
+};
+
+}
diff --git a/vespalib/src/vespa/vespalib/coro/lazy.h b/vespalib/src/vespa/vespalib/coro/lazy.h
index 2ff6c65ca34..974968d0c77 100644
--- a/vespalib/src/vespa/vespalib/coro/lazy.h
+++ b/vespalib/src/vespa/vespalib/coro/lazy.h
@@ -98,4 +98,8 @@ public:
template<std::movable T>
Lazy<T>::promise_type::~promise_type() = default;
+// signal the completion of work without any result value
+struct Done {};
+using Work = Lazy<Done>;
+
}