summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2023-01-25 11:58:47 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2023-01-31 14:51:39 +0000
commit19e3a1addbc4cbd7666fe1842db2fb22bba491ff (patch)
treea06729e7176143997639699bdb7c80d0778163b2
parent3e54969fc961ee51c93404a37d559ab7ea2f9fe6 (diff)
io uring async io implementation
-rw-r--r--vespalib/src/tests/coro/async_io/async_io_test.cpp26
-rw-r--r--vespalib/src/tests/coro/waiting_for/waiting_for_test.cpp43
-rw-r--r--vespalib/src/vespa/vespalib/coro/async_io.cpp35
-rw-r--r--vespalib/src/vespa/vespalib/coro/async_io.h9
-rw-r--r--vespalib/src/vespa/vespalib/coro/io_uring_thread.hpp295
-rw-r--r--vespalib/src/vespa/vespalib/coro/waiting_for.h70
-rw-r--r--vespalib/src/vespa/vespalib/net/socket_address.cpp45
-rw-r--r--vespalib/src/vespa/vespalib/net/socket_address.h3
8 files changed, 460 insertions, 66 deletions
diff --git a/vespalib/src/tests/coro/async_io/async_io_test.cpp b/vespalib/src/tests/coro/async_io/async_io_test.cpp
index f5098e30086..2e783c46e6f 100644
--- a/vespalib/src/tests/coro/async_io/async_io_test.cpp
+++ b/vespalib/src/tests/coro/async_io/async_io_test.cpp
@@ -21,6 +21,14 @@ using namespace vespalib;
using namespace vespalib::coro;
using namespace vespalib::test;
+vespalib::string impl_spec(AsyncIo &async) {
+ switch (async.get_impl_tag()) {
+ case AsyncIo::ImplTag::EPOLL: return "epoll";
+ case AsyncIo::ImplTag::URING: return "uring";
+ }
+ abort();
+}
+
Detached self_exiting_run_loop(AsyncIo::SP async) {
for (size_t i = 0; co_await async->schedule(); ++i) {
fprintf(stderr, "self_exiting_run_loop -> current value: %zu\n", i);
@@ -39,7 +47,7 @@ Work run_loop(AsyncIo &async, int a, int b) {
TEST(AsyncIoTest, create_async_io) {
auto async = AsyncIo::create();
AsyncIo &api = async;
- fprintf(stderr, "async_io impl: %s\n", api.get_impl_spec().c_str());
+ fprintf(stderr, "async_io impl: %s\n", impl_spec(api).c_str());
}
TEST(AsyncIoTest, run_stuff_in_async_io_context) {
@@ -132,10 +140,12 @@ Work async_client(AsyncIo &async, CryptoEngine &engine, ServerSocket &server_soc
co_return co_await verify_socket_io(*socket, false);
}
-void verify_socket_io(CryptoEngine &engine) {
+void verify_socket_io(CryptoEngine &engine, AsyncIo::ImplTag prefer_impl = AsyncIo::ImplTag::EPOLL) {
ServerSocket server_socket("tcp/0");
server_socket.set_blocking(false);
- auto async = AsyncIo::create();
+ auto async = AsyncIo::create(prefer_impl);
+ fprintf(stderr, "verify_socket_io: crypto engine: %s, async impl: %s\n",
+ getClassName(engine).c_str(), impl_spec(async).c_str());
auto f1 = make_future(async_server(async, engine, server_socket));
auto f2 = make_future(async_client(async, engine, server_socket));
(void) f1.get();
@@ -162,4 +172,14 @@ TEST(AsyncIoTest, maybe_tls_false_socket_io) {
verify_socket_io(engine);
}
+TEST(AsyncIoTest, raw_socket_io_with_io_uring_maybe) {
+ NullCryptoEngine engine;
+ verify_socket_io(engine, AsyncIo::ImplTag::URING);
+}
+
+TEST(AsyncIoTest, tls_socket_io_with_io_uring_maybe) {
+ TlsCryptoEngine engine(make_tls_options_for_testing());
+ verify_socket_io(engine, AsyncIo::ImplTag::URING);
+}
+
GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/vespalib/src/tests/coro/waiting_for/waiting_for_test.cpp b/vespalib/src/tests/coro/waiting_for/waiting_for_test.cpp
index 385d4ad24e3..c6678657c34 100644
--- a/vespalib/src/tests/coro/waiting_for/waiting_for_test.cpp
+++ b/vespalib/src/tests/coro/waiting_for/waiting_for_test.cpp
@@ -11,29 +11,29 @@ using namespace vespalib::coro;
struct AsyncService {
std::vector<WaitingFor<int>> pending;
auto get_value() {
- return awaiter_for<int>([&](WaitingFor<int> handle)
- {
- pending.push_back(std::move(handle));
- });
+ return wait_for<int>([&](WaitingFor<int> handle)
+ {
+ pending.push_back(std::move(handle));
+ });
}
};
struct AsyncVoidService {
std::vector<void*> pending;
auto get_value() {
- return awaiter_for<int>([&](WaitingFor<int> handle)
- {
- pending.push_back(handle.release());
- });
+ return wait_for<int>([&](WaitingFor<int> handle)
+ {
+ pending.push_back(handle.release());
+ });
}
};
struct SyncService {
auto get_value() {
- return awaiter_for<int>([](WaitingFor<int> handle)
+ return wait_for<int>([](WaitingFor<int> handle)
{
handle.set_value(42);
- return handle.release_waiter(); // symmetric transfer
+ return handle.mu(); // symmetric transfer
});
}
};
@@ -45,8 +45,8 @@ Lazy<int> wait_for_value(Service &service) {
}
template <typename T>
-Lazy<T> wait_any(auto &&fun) {
- T result = co_await fun();
+Lazy<T> wait_for_fun(auto &&fun) {
+ T result = co_await wait_for<T>(fun);
co_return std::move(result);
}
@@ -62,6 +62,23 @@ TEST(WaitingForTest, wait_for_external_async_int) {
EXPECT_EQ(res.get(), 42);
}
+TEST(WaitingForTest, wait_for_external_async_int_calculated_by_coroutine) {
+ AsyncService service1;
+ AsyncService service2;
+ auto res = make_future(wait_for_value(service1));
+ ASSERT_EQ(service1.pending.size(), 1);
+ {
+ async_wait(wait_for_value(service2), std::move(service1.pending[0]));
+ service1.pending.clear();
+ }
+ EXPECT_TRUE(res.wait_for(0ms) == std::future_status::timeout);
+ ASSERT_EQ(service2.pending.size(), 1);
+ service2.pending[0].set_value(42);
+ service2.pending.clear();
+ EXPECT_TRUE(res.wait_for(0ms) == std::future_status::ready);
+ EXPECT_EQ(res.get(), 42);
+}
+
TEST(WaitingForTest, wait_for_external_async_int_via_void_ptr) {
AsyncVoidService service;
auto res = make_future(wait_for_value(service));
@@ -86,7 +103,7 @@ TEST(WaitingForTest, wait_for_external_sync_int) {
TEST(WaitingForTest, wait_for_move_only_value) {
auto val = std::make_unique<int>(42);
auto fun = [&val](auto handle){ handle.set_value(std::move(val)); }; // asymmetric transfer
- auto res = make_future(wait_any<decltype(val)>([&fun](){ return awaiter_for<decltype(val)>(fun); }));
+ auto res = make_future(wait_for_fun<decltype(val)>(fun));
EXPECT_TRUE(res.wait_for(0ms) == std::future_status::ready);
EXPECT_EQ(*res.get(), 42);
}
diff --git a/vespalib/src/vespa/vespalib/coro/async_io.cpp b/vespalib/src/vespa/vespalib/coro/async_io.cpp
index 8d628fd7887..86eddfbd035 100644
--- a/vespalib/src/vespa/vespalib/coro/async_io.cpp
+++ b/vespalib/src/vespa/vespalib/coro/async_io.cpp
@@ -4,12 +4,28 @@
#include "detached.h"
#include <vespa/vespalib/net/selector.h>
#include <vespa/vespalib/util/require.h>
+#include <vespa/vespalib/util/time.h>
+#include <vespa/config.h>
+#include <thread>
#include <atomic>
#include <vector>
#include <map>
#include <set>
+#ifdef VESPA_HAS_IO_URING
+#include "io_uring_thread.hpp"
+namespace {
+bool can_use_io_uring() { return vespalib::coro::UringProbe::check_support(); }
+vespalib::coro::AsyncIo::SP create_io_uring_thread() { return std::make_shared<vespalib::coro::IoUringThread>(); }
+}
+#else
+namespace {
+bool can_use_io_uring() { return false; }
+vespalib::coro::AsyncIo::SP create_io_uring_thread() { abort(); }
+}
+#endif
+
namespace vespalib::coro {
namespace {
@@ -195,15 +211,17 @@ struct SelectorThread : AsyncIo {
writer.resume();
}
}
- vespalib::string get_impl_spec() override {
- return "selector-thread";
- }
+ ImplTag get_impl_tag() override { return ImplTag::EPOLL; }
Lazy<SocketHandle> accept(ServerSocket &server_socket) override {
bool in_thread = co_await enter_thread();
if (in_thread) {
bool can_read = co_await readable(server_socket.get_fd());
if (can_read) {
- co_return server_socket.accept();
+ auto res = server_socket.accept();
+ if (res.valid()) {
+ res.set_blocking(false);
+ }
+ co_return res;
}
}
co_return SocketHandle(-ECANCELED);
@@ -325,7 +343,7 @@ SelectorThread::~SelectorThread()
REQUIRE(_queue.empty());
}
-}
+} // <unnamed>
AsyncIo::~AsyncIo() = default;
AsyncIo::AsyncIo() = default;
@@ -367,8 +385,11 @@ AsyncIo::Owner::~Owner()
}
AsyncIo::Owner
-AsyncIo::create() {
+AsyncIo::create(ImplTag prefer_impl) {
+ if (prefer_impl == ImplTag::URING && can_use_io_uring()) {
+ return Owner(create_io_uring_thread());
+ }
return Owner(std::make_shared<SelectorThread>());
}
-}
+} // vespalib::coro
diff --git a/vespalib/src/vespa/vespalib/coro/async_io.h b/vespalib/src/vespa/vespalib/coro/async_io.h
index 56d2aae7fdf..285821fef85 100644
--- a/vespalib/src/vespa/vespalib/coro/async_io.h
+++ b/vespalib/src/vespa/vespalib/coro/async_io.h
@@ -26,6 +26,10 @@ struct AsyncIo : std::enable_shared_from_this<AsyncIo> {
virtual ~AsyncIo();
using SP = std::shared_ptr<AsyncIo>;
+ // tag used to separate implementations
+ enum class ImplTag { EPOLL, URING };
+ static constexpr ImplTag default_impl() { return ImplTag::EPOLL; }
+
// thin wrapper used by the owner to handle lifetime
class Owner {
private:
@@ -46,10 +50,11 @@ struct AsyncIo : std::enable_shared_from_this<AsyncIo> {
};
// create an async_io 'runtime'
- static Owner create();
+ // note that the preferred implementation may not be available
+ static Owner create(ImplTag prefer_impl = default_impl());
// implementation tag
- virtual vespalib::string get_impl_spec() = 0;
+ virtual ImplTag get_impl_tag() = 0;
// api for async io used by coroutines
virtual Lazy<SocketHandle> accept(ServerSocket &server_socket) = 0;
diff --git a/vespalib/src/vespa/vespalib/coro/io_uring_thread.hpp b/vespalib/src/vespa/vespalib/coro/io_uring_thread.hpp
new file mode 100644
index 00000000000..143e65d02d4
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/coro/io_uring_thread.hpp
@@ -0,0 +1,295 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+// this file is included by async_io.cpp if VESPA_HAS_IO_URING is defined
+
+#include <liburing.h>
+#include <liburing/io_uring.h>
+#include <sys/eventfd.h>
+
+namespace vespalib::coro {
+namespace {
+
+using Handle = std::coroutine_handle<>;
+using cqe_res_t = int32_t;
+
+// Server sockets are always non-blocking. We need to temporarily set
+// them to blocking mode to avoid that async accept return -EAGAIN.
+struct BlockingGuard {
+ int fd;
+ BlockingGuard(int fd_in) : fd(fd_in) {
+ SocketOptions::set_blocking(fd, true);
+ }
+ ~BlockingGuard() {
+ SocketOptions::set_blocking(fd, false);
+ }
+};
+
+struct UringProbe {
+ io_uring_probe *probe;
+ UringProbe() : probe(io_uring_get_probe()) {}
+ ~UringProbe() { free(probe); }
+ bool check(int opcode) {
+ return probe && io_uring_opcode_supported(probe, opcode);
+ }
+ static bool check_support() {
+ UringProbe probe;
+ return probe.check(IORING_OP_ACCEPT)
+ && probe.check(IORING_OP_CONNECT)
+ && probe.check(IORING_OP_READ)
+ && probe.check(IORING_OP_WRITE);
+ }
+};
+
+struct Uring {
+ io_uring uring;
+ size_t pending;
+ Uring() : pending(0) {
+ int res = io_uring_queue_init(4096, &uring, 0);
+ REQUIRE_EQ(res, 0);
+ }
+ auto *get_sqe() {
+ auto *res = io_uring_get_sqe(&uring);
+ while (res == nullptr) {
+ auto submit_res = io_uring_submit(&uring);
+ REQUIRE(submit_res >= 0);
+ res = io_uring_get_sqe(&uring);
+ }
+ ++pending;
+ return res;
+ }
+ void submit_and_dispatch() {
+ auto res = io_uring_submit_and_wait(&uring, 1);
+ REQUIRE(res >= 0);
+ io_uring_cqe *cqe = nullptr;
+ while (io_uring_peek_cqe(&uring, &cqe) == 0) {
+ auto wf = WaitingFor<cqe_res_t>::from_pointer(io_uring_cqe_get_data(cqe));
+ wf.set_value(cqe->res);
+ io_uring_cqe_seen(&uring, cqe);
+ --pending;
+ }
+ }
+ void drain_pending() {
+ while (pending > 0) {
+ auto res = io_uring_submit_and_wait(&uring, 1);
+ REQUIRE(res >= 0);
+ io_uring_cqe *cqe = nullptr;
+ while (io_uring_peek_cqe(&uring, &cqe) == 0) {
+ auto wf = WaitingFor<cqe_res_t>::from_pointer(io_uring_cqe_get_data(cqe));
+ wf.set_value(-ECANCELED);
+ io_uring_cqe_seen(&uring, cqe);
+ --pending;
+ }
+ }
+ }
+ ~Uring() {
+ REQUIRE_EQ(pending, 0u);
+ io_uring_queue_exit(&uring);
+ }
+};
+
+auto wait_for_sqe(auto *sqe) {
+ return wait_for<cqe_res_t>([sqe](auto wf)
+ {
+ io_uring_sqe_set_data(sqe, wf.release());
+ });
+}
+
+struct IoUringThread : AsyncIo {
+
+ struct RunGuard;
+ using ThreadId = std::atomic<std::thread::id>;
+ using RunOp = WaitingFor<bool>;
+
+ Uring _uring;
+ SocketHandle _event;
+ std::thread _thread;
+ ThreadId _thread_id;
+ std::vector<RunOp> _todo;
+ std::mutex _lock;
+ std::vector<RunOp> _queue;
+
+ IoUringThread()
+ : _uring(),
+ _event(eventfd(0, 0)),
+ _thread(),
+ _thread_id(std::thread::id()),
+ _todo(),
+ _lock(),
+ _queue()
+ {
+ static_assert(ThreadId::is_always_lock_free);
+ REQUIRE(_event.valid());
+ }
+ void start() override;
+ void main();
+ void init_shutdown() override;
+ void fini_shutdown() override;
+ ~IoUringThread();
+ bool running() const noexcept {
+ return (_thread_id.load(std::memory_order_relaxed) != std::thread::id());
+ }
+ bool stopped() const noexcept {
+ return (_thread_id.load(std::memory_order_relaxed) == std::thread::id());
+ }
+ bool in_thread() const noexcept {
+ return (std::this_thread::get_id() == _thread_id.load(std::memory_order_relaxed));
+ }
+ auto protect() { return std::lock_guard(_lock); }
+ void wakeup() {
+ uint64_t value = 1;
+ int res = ::write(_event.get(), &value, sizeof(value));
+ REQUIRE_EQ(res, int(sizeof(value)));
+ }
+ auto async_run() {
+ return wait_for<bool>([this](auto wf)
+ ->std::coroutine_handle<>
+ {
+ bool need_wakeup = false;
+ {
+ auto guard = protect();
+ if (stopped()) {
+ wf.set_value(false);
+ return wf.mu();
+ }
+ need_wakeup = _queue.empty();
+ _queue.push_back(std::move(wf));
+ }
+ if (need_wakeup) {
+ wakeup();
+ }
+ return std::noop_coroutine();
+ });
+ }
+ void handle_queue(bool result) {
+ {
+ auto guard = protect();
+ std::swap(_todo, _queue);
+ }
+ for (auto &&item: _todo) {
+ auto wf = std::move(item);
+ wf.set_value(result);
+ }
+ _todo.clear();
+ }
+ ImplTag get_impl_tag() override { return ImplTag::URING; }
+ Lazy<SocketHandle> accept(ServerSocket &server_socket) override {
+ BlockingGuard blocking_guard(server_socket.get_fd());
+ int res = -ECANCELED;
+ bool inside = in_thread() ? true : co_await async_run();
+ if (inside) {
+ auto *sqe = _uring.get_sqe();
+ io_uring_prep_accept(sqe, server_socket.get_fd(), nullptr, nullptr, 0);
+ res = co_await wait_for_sqe(sqe);
+ }
+ co_return SocketHandle(res);
+ }
+ Lazy<SocketHandle> connect(const SocketAddress &addr) override {
+ SocketHandle handle = addr.raw_socket();
+ if (handle.valid()) {
+ bool inside = in_thread() ? true : co_await async_run();
+ if (inside) {
+ auto *sqe = _uring.get_sqe();
+ io_uring_prep_connect(sqe, handle.get(), addr.raw_addr(), addr.raw_addr_len());
+ int res = co_await wait_for_sqe(sqe);
+ if (res < 0) {
+ handle.reset(res);
+ }
+ }
+ }
+ co_return handle;
+ }
+ Lazy<ssize_t> read(SocketHandle &socket, char *buf, size_t len) override {
+ ssize_t res = -ECANCELED;
+ bool inside = in_thread() ? true : co_await async_run();
+ if (inside) {
+ auto *sqe = _uring.get_sqe();
+ io_uring_prep_read(sqe, socket.get(), buf, len, 0);
+ res = co_await wait_for_sqe(sqe);
+ }
+ co_return res;
+ }
+ Lazy<ssize_t> write(SocketHandle &socket, const char *buf, size_t len) override {
+ ssize_t res = -ECANCELED;
+ bool inside = in_thread() ? true : co_await async_run();
+ if (inside) {
+ auto *sqe = _uring.get_sqe();
+ io_uring_prep_write(sqe, socket.get(), buf, len, 0);
+ res = co_await wait_for_sqe(sqe);
+ }
+ co_return res;
+ }
+ Lazy<bool> schedule() override {
+ co_return co_await async_run();
+ }
+ Detached consume_events() {
+ uint64_t value = 0;
+ REQUIRE(in_thread());
+ int res = sizeof(value);
+ while (running() && (res == sizeof(value))) {
+ auto *sqe = _uring.get_sqe();
+ io_uring_prep_read(sqe, _event.get(), &value, sizeof(value), 0);
+ res = co_await wait_for_sqe(sqe);
+ }
+ }
+ Detached async_shutdown() {
+ bool inside = in_thread() ? true : co_await async_run();
+ REQUIRE(inside && "unable to initialize shutdown of internal thread");
+ {
+ auto guard = protect();
+ _thread_id = std::thread::id();
+ }
+ }
+};
+
+void
+IoUringThread::start()
+{
+ _thread = std::thread(&IoUringThread::main, this);
+ _thread_id.wait(std::thread::id());
+}
+
+struct IoUringThread::RunGuard {
+ IoUringThread &self;
+ RunGuard(IoUringThread &self_in) noexcept : self(self_in) {
+ self._thread_id = std::this_thread::get_id();
+ self._thread_id.notify_all();
+ self.consume_events();
+ }
+ ~RunGuard() {
+ REQUIRE(self.stopped());
+ self.wakeup();
+ self.handle_queue(false);
+ self._uring.drain_pending();
+ }
+};
+
+void
+IoUringThread::main()
+{
+ RunGuard guard(*this);
+ while (running()) {
+ _uring.submit_and_dispatch();
+ handle_queue(true);
+ }
+}
+
+void
+IoUringThread::init_shutdown()
+{
+ async_shutdown();
+}
+
+void
+IoUringThread::fini_shutdown()
+{
+ _thread.join();
+}
+
+IoUringThread::~IoUringThread()
+{
+ REQUIRE(_todo.empty());
+ REQUIRE(_queue.empty());
+}
+
+} // <unnamed>
+} // vespalib::coro
diff --git a/vespalib/src/vespa/vespalib/coro/waiting_for.h b/vespalib/src/vespa/vespalib/coro/waiting_for.h
index 2e11a9cb38c..d82d0779f41 100644
--- a/vespalib/src/vespa/vespalib/coro/waiting_for.h
+++ b/vespalib/src/vespa/vespalib/coro/waiting_for.h
@@ -48,9 +48,6 @@ public:
void set_done() {
_state->result.set_done();
}
- std::coroutine_handle<> release_waiter() {
- return std::exchange(_state->waiter, std::noop_coroutine());
- }
void *release() {
return std::exchange(_state, nullptr);
}
@@ -61,6 +58,18 @@ public:
static WaitingFor from_state(PromiseState<T> &state) {
return {&state};
}
+ // Unasking the question. This will drop the internal reference to
+ // the promise state and return the handle for the waiting
+ // coroutine. A function responsible for starting an async
+ // operation may chose to do 'wf.set_value(<result>)' followed by
+ // 'return wf.mu()' to convert the async operation to a sync
+ // operation by immediately resuming the waiting coroutine (by
+ // symmetrically transferring control to itself).
+ [[nodiscard]] std::coroutine_handle<> mu() {
+ auto handle = std::exchange(_state->waiter, std::noop_coroutine());
+ _state = nullptr;
+ return handle;
+ }
};
template <typename T>
@@ -74,35 +83,54 @@ WaitingFor<T>::~WaitingFor()
static_assert(receiver_of<WaitingFor<int>, int>);
static_assert(receiver_of<WaitingFor<std::unique_ptr<int>>, std::unique_ptr<int>>);
+// concept indicating that F is a function that starts an async
+// operation with T as result. The result will eventually be delivered
+// to the WaitingFor<T> passed to the function.
+template <typename F, typename T>
+concept start_async_op = requires(F &&f, std::decay_t<F> cpy, WaitingFor<T> wf) {
+ std::decay_t<F>(std::forward<F>(f));
+ { cpy(std::move(wf)) } -> std::same_as<void>;
+};
+
+// concept indicating that F is a function that starts an async
+// operation with T as result. The result will eventually be delivered
+// to the WaitingFor<T> passed to the function. This variant will use
+// symmetric transfer to switch to another coroutine as a side-effect
+// of starting the async operation (and thus suspending the
+// coroutine). This also enables the function to change the operation
+// form async to sync by setting the value directly in the function
+// and returning wf.mu()
+template <typename F, typename T>
+concept maybe_start_async_op = requires(F &&f, std::decay_t<F> cpy, WaitingFor<T> wf) {
+ std::decay_t<F>(std::forward<F>(f));
+ { cpy(std::move(wf)) } -> std::same_as<std::coroutine_handle<>>;
+};
+
// Create a custom awaiter that will return a value of type T when the
-// coroutine is resumed. The waiting coroutine will be represented as
-// a WaitingFor<T> that is passed as the only parameter to 'f'. The
-// return value of 'f' is returned from await_suspend, which means it
-// must be void, bool or coroutine handle. If 'f' returns a value
-// indicating that the coroutine should be resumed immediately,
-// WaitingFor<T>::release_waiter() must be called to avoid resume
-// being called as well. Note that await_ready will always return
-// false, since the coroutine needs to be suspended in order to create
-// the WaitingFor<T> object needed. Also, the WaitingFor<T> api
-// implies that the value will be set from the outside and thus cannot
-// be ready up-front. Also note that await_resume must return T by
-// value, since the awaiter containing the result is a temporary
-// object.
+// coroutine is resumed. The operation waited upon is represented by
+// the function object used to start it. Note that await_ready will
+// always return false, since the coroutine needs to be suspended in
+// order to create the WaitingFor<T> object needed. Also, the
+// WaitingFor<T> api implies that the value will be set from the
+// outside and thus cannot be ready up-front. Also note that
+// await_resume must return T by value, since the awaiter containing
+// the result is a temporary object.
template <typename T, typename F>
-auto awaiter_for(F &&f) {
+requires start_async_op<F,T> || maybe_start_async_op<F,T>
+auto wait_for(F &&on_suspend) {
struct awaiter final : PromiseState<T> {
using PromiseState<T>::result;
using PromiseState<T>::waiter;
- std::decay_t<F> fun;
- awaiter(F &&f) : PromiseState<T>(), fun(std::forward<F>(f)) {}
+ std::decay_t<F> on_suspend;
+ awaiter(F &&on_suspend_in) : PromiseState<T>(), on_suspend(std::forward<F>(on_suspend_in)) {}
bool await_ready() const noexcept { return false; }
T await_resume() { return std::move(result).get_value(); }
decltype(auto) await_suspend(std::coroutine_handle<> handle) __attribute__((noinline)) {
waiter = handle;
- return fun(WaitingFor<T>::from_state(*this));
+ return on_suspend(WaitingFor<T>::from_state(*this));
}
};
- return awaiter(std::forward<F>(f));
+ return awaiter(std::forward<F>(on_suspend));
}
}
diff --git a/vespalib/src/vespa/vespalib/net/socket_address.cpp b/vespalib/src/vespa/vespalib/net/socket_address.cpp
index c1fb1cb0858..4c552b03de7 100644
--- a/vespalib/src/vespa/vespalib/net/socket_address.cpp
+++ b/vespalib/src/vespa/vespalib/net/socket_address.cpp
@@ -152,14 +152,21 @@ SocketAddress::spec() const
}
SocketHandle
-SocketAddress::connect(const std::function<bool(SocketHandle&)> &tweak) const
+SocketAddress::raw_socket() const
{
if (valid()) {
- SocketHandle handle(socket(_addr.ss_family, SOCK_STREAM, 0));
- if (handle.valid() && tweak(handle)) {
- if ((::connect(handle.get(), addr(), _size) == 0) || (errno == EINPROGRESS)) {
- return handle;
- }
+ return SocketHandle(::socket(_addr.ss_family, SOCK_STREAM, 0));
+ }
+ return SocketHandle();
+}
+
+SocketHandle
+SocketAddress::connect(const std::function<bool(SocketHandle&)> &tweak) const
+{
+ SocketHandle handle = raw_socket();
+ if (handle.valid() && tweak(handle)) {
+ if ((::connect(handle.get(), addr(), _size) == 0) || (errno == EINPROGRESS)) {
+ return handle;
}
}
return SocketHandle();
@@ -168,20 +175,18 @@ SocketAddress::connect(const std::function<bool(SocketHandle&)> &tweak) const
SocketHandle
SocketAddress::listen(int backlog) const
{
- if (valid()) {
- SocketHandle handle(socket(_addr.ss_family, SOCK_STREAM, 0));
- if (handle.valid()) {
- if (is_ipv6()) {
- handle.set_ipv6_only(false);
- }
- if (port() > 0) {
- handle.set_reuse_addr(true);
- }
- if ((bind(handle.get(), addr(), _size) == 0) &&
- (::listen(handle.get(), backlog) == 0))
- {
- return handle;
- }
+ SocketHandle handle = raw_socket();
+ if (handle.valid()) {
+ if (is_ipv6()) {
+ handle.set_ipv6_only(false);
+ }
+ if (port() > 0) {
+ handle.set_reuse_addr(true);
+ }
+ if ((bind(handle.get(), addr(), _size) == 0) &&
+ (::listen(handle.get(), backlog) == 0))
+ {
+ return handle;
}
}
return SocketHandle();
diff --git a/vespalib/src/vespa/vespalib/net/socket_address.h b/vespalib/src/vespa/vespalib/net/socket_address.h
index 6dfe843c12e..88da83d902b 100644
--- a/vespalib/src/vespa/vespalib/net/socket_address.h
+++ b/vespalib/src/vespa/vespalib/net/socket_address.h
@@ -35,6 +35,8 @@ public:
memcpy(this, &rhs, sizeof(SocketAddress));
return *this;
}
+ const sockaddr *raw_addr() const { return addr(); }
+ socklen_t raw_addr_len() const { return _size; }
bool valid() const { return (_size >= sizeof(sa_family_t)); }
bool is_ipv4() const { return (valid() && (_addr.ss_family == AF_INET)); }
bool is_ipv6() const { return (valid() && (_addr.ss_family == AF_INET6)); }
@@ -47,6 +49,7 @@ public:
vespalib::string path() const;
vespalib::string name() const;
vespalib::string spec() const;
+ SocketHandle raw_socket() const;
SocketHandle connect(const std::function<bool(SocketHandle&)> &tweak) const;
SocketHandle connect() const { return connect([](SocketHandle&) noexcept { return true; }); }
SocketHandle connect_async() const {