From 19e3a1addbc4cbd7666fe1842db2fb22bba491ff Mon Sep 17 00:00:00 2001 From: HÃ¥vard Pettersen Date: Wed, 25 Jan 2023 11:58:47 +0000 Subject: io uring async io implementation --- vespalib/src/tests/coro/async_io/async_io_test.cpp | 26 +- .../tests/coro/waiting_for/waiting_for_test.cpp | 43 ++- vespalib/src/vespa/vespalib/coro/async_io.cpp | 35 ++- vespalib/src/vespa/vespalib/coro/async_io.h | 9 +- .../src/vespa/vespalib/coro/io_uring_thread.hpp | 295 +++++++++++++++++++++ vespalib/src/vespa/vespalib/coro/waiting_for.h | 70 +++-- vespalib/src/vespa/vespalib/net/socket_address.cpp | 45 ++-- vespalib/src/vespa/vespalib/net/socket_address.h | 3 + 8 files changed, 460 insertions(+), 66 deletions(-) create mode 100644 vespalib/src/vespa/vespalib/coro/io_uring_thread.hpp diff --git a/vespalib/src/tests/coro/async_io/async_io_test.cpp b/vespalib/src/tests/coro/async_io/async_io_test.cpp index f5098e30086..2e783c46e6f 100644 --- a/vespalib/src/tests/coro/async_io/async_io_test.cpp +++ b/vespalib/src/tests/coro/async_io/async_io_test.cpp @@ -21,6 +21,14 @@ using namespace vespalib; using namespace vespalib::coro; using namespace vespalib::test; +vespalib::string impl_spec(AsyncIo &async) { + switch (async.get_impl_tag()) { + case AsyncIo::ImplTag::EPOLL: return "epoll"; + case AsyncIo::ImplTag::URING: return "uring"; + } + abort(); +} + Detached self_exiting_run_loop(AsyncIo::SP async) { for (size_t i = 0; co_await async->schedule(); ++i) { fprintf(stderr, "self_exiting_run_loop -> current value: %zu\n", i); @@ -39,7 +47,7 @@ Work run_loop(AsyncIo &async, int a, int b) { TEST(AsyncIoTest, create_async_io) { auto async = AsyncIo::create(); AsyncIo &api = async; - fprintf(stderr, "async_io impl: %s\n", api.get_impl_spec().c_str()); + fprintf(stderr, "async_io impl: %s\n", impl_spec(api).c_str()); } TEST(AsyncIoTest, run_stuff_in_async_io_context) { @@ -132,10 +140,12 @@ Work async_client(AsyncIo &async, CryptoEngine &engine, ServerSocket &server_soc co_return co_await verify_socket_io(*socket, false); } -void verify_socket_io(CryptoEngine &engine) { +void verify_socket_io(CryptoEngine &engine, AsyncIo::ImplTag prefer_impl = AsyncIo::ImplTag::EPOLL) { ServerSocket server_socket("tcp/0"); server_socket.set_blocking(false); - auto async = AsyncIo::create(); + auto async = AsyncIo::create(prefer_impl); + fprintf(stderr, "verify_socket_io: crypto engine: %s, async impl: %s\n", + getClassName(engine).c_str(), impl_spec(async).c_str()); auto f1 = make_future(async_server(async, engine, server_socket)); auto f2 = make_future(async_client(async, engine, server_socket)); (void) f1.get(); @@ -162,4 +172,14 @@ TEST(AsyncIoTest, maybe_tls_false_socket_io) { verify_socket_io(engine); } +TEST(AsyncIoTest, raw_socket_io_with_io_uring_maybe) { + NullCryptoEngine engine; + verify_socket_io(engine, AsyncIo::ImplTag::URING); +} + +TEST(AsyncIoTest, tls_socket_io_with_io_uring_maybe) { + TlsCryptoEngine engine(make_tls_options_for_testing()); + verify_socket_io(engine, AsyncIo::ImplTag::URING); +} + GTEST_MAIN_RUN_ALL_TESTS() diff --git a/vespalib/src/tests/coro/waiting_for/waiting_for_test.cpp b/vespalib/src/tests/coro/waiting_for/waiting_for_test.cpp index 385d4ad24e3..c6678657c34 100644 --- a/vespalib/src/tests/coro/waiting_for/waiting_for_test.cpp +++ b/vespalib/src/tests/coro/waiting_for/waiting_for_test.cpp @@ -11,29 +11,29 @@ using namespace vespalib::coro; struct AsyncService { std::vector> pending; auto get_value() { - return awaiter_for([&](WaitingFor handle) - { - pending.push_back(std::move(handle)); - }); + return wait_for([&](WaitingFor handle) + { + pending.push_back(std::move(handle)); + }); } }; struct AsyncVoidService { std::vector pending; auto get_value() { - return awaiter_for([&](WaitingFor handle) - { - pending.push_back(handle.release()); - }); + return wait_for([&](WaitingFor handle) + { + pending.push_back(handle.release()); + }); } }; struct SyncService { auto get_value() { - return awaiter_for([](WaitingFor handle) + return wait_for([](WaitingFor handle) { handle.set_value(42); - return handle.release_waiter(); // symmetric transfer + return handle.mu(); // symmetric transfer }); } }; @@ -45,8 +45,8 @@ Lazy wait_for_value(Service &service) { } template -Lazy wait_any(auto &&fun) { - T result = co_await fun(); +Lazy wait_for_fun(auto &&fun) { + T result = co_await wait_for(fun); co_return std::move(result); } @@ -62,6 +62,23 @@ TEST(WaitingForTest, wait_for_external_async_int) { EXPECT_EQ(res.get(), 42); } +TEST(WaitingForTest, wait_for_external_async_int_calculated_by_coroutine) { + AsyncService service1; + AsyncService service2; + auto res = make_future(wait_for_value(service1)); + ASSERT_EQ(service1.pending.size(), 1); + { + async_wait(wait_for_value(service2), std::move(service1.pending[0])); + service1.pending.clear(); + } + EXPECT_TRUE(res.wait_for(0ms) == std::future_status::timeout); + ASSERT_EQ(service2.pending.size(), 1); + service2.pending[0].set_value(42); + service2.pending.clear(); + EXPECT_TRUE(res.wait_for(0ms) == std::future_status::ready); + EXPECT_EQ(res.get(), 42); +} + TEST(WaitingForTest, wait_for_external_async_int_via_void_ptr) { AsyncVoidService service; auto res = make_future(wait_for_value(service)); @@ -86,7 +103,7 @@ TEST(WaitingForTest, wait_for_external_sync_int) { TEST(WaitingForTest, wait_for_move_only_value) { auto val = std::make_unique(42); auto fun = [&val](auto handle){ handle.set_value(std::move(val)); }; // asymmetric transfer - auto res = make_future(wait_any([&fun](){ return awaiter_for(fun); })); + auto res = make_future(wait_for_fun(fun)); EXPECT_TRUE(res.wait_for(0ms) == std::future_status::ready); EXPECT_EQ(*res.get(), 42); } diff --git a/vespalib/src/vespa/vespalib/coro/async_io.cpp b/vespalib/src/vespa/vespalib/coro/async_io.cpp index 8d628fd7887..86eddfbd035 100644 --- a/vespalib/src/vespa/vespalib/coro/async_io.cpp +++ b/vespalib/src/vespa/vespalib/coro/async_io.cpp @@ -4,12 +4,28 @@ #include "detached.h" #include #include +#include +#include +#include #include #include #include #include +#ifdef VESPA_HAS_IO_URING +#include "io_uring_thread.hpp" +namespace { +bool can_use_io_uring() { return vespalib::coro::UringProbe::check_support(); } +vespalib::coro::AsyncIo::SP create_io_uring_thread() { return std::make_shared(); } +} +#else +namespace { +bool can_use_io_uring() { return false; } +vespalib::coro::AsyncIo::SP create_io_uring_thread() { abort(); } +} +#endif + namespace vespalib::coro { namespace { @@ -195,15 +211,17 @@ struct SelectorThread : AsyncIo { writer.resume(); } } - vespalib::string get_impl_spec() override { - return "selector-thread"; - } + ImplTag get_impl_tag() override { return ImplTag::EPOLL; } Lazy accept(ServerSocket &server_socket) override { bool in_thread = co_await enter_thread(); if (in_thread) { bool can_read = co_await readable(server_socket.get_fd()); if (can_read) { - co_return server_socket.accept(); + auto res = server_socket.accept(); + if (res.valid()) { + res.set_blocking(false); + } + co_return res; } } co_return SocketHandle(-ECANCELED); @@ -325,7 +343,7 @@ SelectorThread::~SelectorThread() REQUIRE(_queue.empty()); } -} +} // AsyncIo::~AsyncIo() = default; AsyncIo::AsyncIo() = default; @@ -367,8 +385,11 @@ AsyncIo::Owner::~Owner() } AsyncIo::Owner -AsyncIo::create() { +AsyncIo::create(ImplTag prefer_impl) { + if (prefer_impl == ImplTag::URING && can_use_io_uring()) { + return Owner(create_io_uring_thread()); + } return Owner(std::make_shared()); } -} +} // vespalib::coro diff --git a/vespalib/src/vespa/vespalib/coro/async_io.h b/vespalib/src/vespa/vespalib/coro/async_io.h index 56d2aae7fdf..285821fef85 100644 --- a/vespalib/src/vespa/vespalib/coro/async_io.h +++ b/vespalib/src/vespa/vespalib/coro/async_io.h @@ -26,6 +26,10 @@ struct AsyncIo : std::enable_shared_from_this { virtual ~AsyncIo(); using SP = std::shared_ptr; + // tag used to separate implementations + enum class ImplTag { EPOLL, URING }; + static constexpr ImplTag default_impl() { return ImplTag::EPOLL; } + // thin wrapper used by the owner to handle lifetime class Owner { private: @@ -46,10 +50,11 @@ struct AsyncIo : std::enable_shared_from_this { }; // create an async_io 'runtime' - static Owner create(); + // note that the preferred implementation may not be available + static Owner create(ImplTag prefer_impl = default_impl()); // implementation tag - virtual vespalib::string get_impl_spec() = 0; + virtual ImplTag get_impl_tag() = 0; // api for async io used by coroutines virtual Lazy accept(ServerSocket &server_socket) = 0; diff --git a/vespalib/src/vespa/vespalib/coro/io_uring_thread.hpp b/vespalib/src/vespa/vespalib/coro/io_uring_thread.hpp new file mode 100644 index 00000000000..143e65d02d4 --- /dev/null +++ b/vespalib/src/vespa/vespalib/coro/io_uring_thread.hpp @@ -0,0 +1,295 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +// this file is included by async_io.cpp if VESPA_HAS_IO_URING is defined + +#include +#include +#include + +namespace vespalib::coro { +namespace { + +using Handle = std::coroutine_handle<>; +using cqe_res_t = int32_t; + +// Server sockets are always non-blocking. We need to temporarily set +// them to blocking mode to avoid that async accept return -EAGAIN. +struct BlockingGuard { + int fd; + BlockingGuard(int fd_in) : fd(fd_in) { + SocketOptions::set_blocking(fd, true); + } + ~BlockingGuard() { + SocketOptions::set_blocking(fd, false); + } +}; + +struct UringProbe { + io_uring_probe *probe; + UringProbe() : probe(io_uring_get_probe()) {} + ~UringProbe() { free(probe); } + bool check(int opcode) { + return probe && io_uring_opcode_supported(probe, opcode); + } + static bool check_support() { + UringProbe probe; + return probe.check(IORING_OP_ACCEPT) + && probe.check(IORING_OP_CONNECT) + && probe.check(IORING_OP_READ) + && probe.check(IORING_OP_WRITE); + } +}; + +struct Uring { + io_uring uring; + size_t pending; + Uring() : pending(0) { + int res = io_uring_queue_init(4096, &uring, 0); + REQUIRE_EQ(res, 0); + } + auto *get_sqe() { + auto *res = io_uring_get_sqe(&uring); + while (res == nullptr) { + auto submit_res = io_uring_submit(&uring); + REQUIRE(submit_res >= 0); + res = io_uring_get_sqe(&uring); + } + ++pending; + return res; + } + void submit_and_dispatch() { + auto res = io_uring_submit_and_wait(&uring, 1); + REQUIRE(res >= 0); + io_uring_cqe *cqe = nullptr; + while (io_uring_peek_cqe(&uring, &cqe) == 0) { + auto wf = WaitingFor::from_pointer(io_uring_cqe_get_data(cqe)); + wf.set_value(cqe->res); + io_uring_cqe_seen(&uring, cqe); + --pending; + } + } + void drain_pending() { + while (pending > 0) { + auto res = io_uring_submit_and_wait(&uring, 1); + REQUIRE(res >= 0); + io_uring_cqe *cqe = nullptr; + while (io_uring_peek_cqe(&uring, &cqe) == 0) { + auto wf = WaitingFor::from_pointer(io_uring_cqe_get_data(cqe)); + wf.set_value(-ECANCELED); + io_uring_cqe_seen(&uring, cqe); + --pending; + } + } + } + ~Uring() { + REQUIRE_EQ(pending, 0u); + io_uring_queue_exit(&uring); + } +}; + +auto wait_for_sqe(auto *sqe) { + return wait_for([sqe](auto wf) + { + io_uring_sqe_set_data(sqe, wf.release()); + }); +} + +struct IoUringThread : AsyncIo { + + struct RunGuard; + using ThreadId = std::atomic; + using RunOp = WaitingFor; + + Uring _uring; + SocketHandle _event; + std::thread _thread; + ThreadId _thread_id; + std::vector _todo; + std::mutex _lock; + std::vector _queue; + + IoUringThread() + : _uring(), + _event(eventfd(0, 0)), + _thread(), + _thread_id(std::thread::id()), + _todo(), + _lock(), + _queue() + { + static_assert(ThreadId::is_always_lock_free); + REQUIRE(_event.valid()); + } + void start() override; + void main(); + void init_shutdown() override; + void fini_shutdown() override; + ~IoUringThread(); + bool running() const noexcept { + return (_thread_id.load(std::memory_order_relaxed) != std::thread::id()); + } + bool stopped() const noexcept { + return (_thread_id.load(std::memory_order_relaxed) == std::thread::id()); + } + bool in_thread() const noexcept { + return (std::this_thread::get_id() == _thread_id.load(std::memory_order_relaxed)); + } + auto protect() { return std::lock_guard(_lock); } + void wakeup() { + uint64_t value = 1; + int res = ::write(_event.get(), &value, sizeof(value)); + REQUIRE_EQ(res, int(sizeof(value))); + } + auto async_run() { + return wait_for([this](auto wf) + ->std::coroutine_handle<> + { + bool need_wakeup = false; + { + auto guard = protect(); + if (stopped()) { + wf.set_value(false); + return wf.mu(); + } + need_wakeup = _queue.empty(); + _queue.push_back(std::move(wf)); + } + if (need_wakeup) { + wakeup(); + } + return std::noop_coroutine(); + }); + } + void handle_queue(bool result) { + { + auto guard = protect(); + std::swap(_todo, _queue); + } + for (auto &&item: _todo) { + auto wf = std::move(item); + wf.set_value(result); + } + _todo.clear(); + } + ImplTag get_impl_tag() override { return ImplTag::URING; } + Lazy accept(ServerSocket &server_socket) override { + BlockingGuard blocking_guard(server_socket.get_fd()); + int res = -ECANCELED; + bool inside = in_thread() ? true : co_await async_run(); + if (inside) { + auto *sqe = _uring.get_sqe(); + io_uring_prep_accept(sqe, server_socket.get_fd(), nullptr, nullptr, 0); + res = co_await wait_for_sqe(sqe); + } + co_return SocketHandle(res); + } + Lazy connect(const SocketAddress &addr) override { + SocketHandle handle = addr.raw_socket(); + if (handle.valid()) { + bool inside = in_thread() ? true : co_await async_run(); + if (inside) { + auto *sqe = _uring.get_sqe(); + io_uring_prep_connect(sqe, handle.get(), addr.raw_addr(), addr.raw_addr_len()); + int res = co_await wait_for_sqe(sqe); + if (res < 0) { + handle.reset(res); + } + } + } + co_return handle; + } + Lazy read(SocketHandle &socket, char *buf, size_t len) override { + ssize_t res = -ECANCELED; + bool inside = in_thread() ? true : co_await async_run(); + if (inside) { + auto *sqe = _uring.get_sqe(); + io_uring_prep_read(sqe, socket.get(), buf, len, 0); + res = co_await wait_for_sqe(sqe); + } + co_return res; + } + Lazy write(SocketHandle &socket, const char *buf, size_t len) override { + ssize_t res = -ECANCELED; + bool inside = in_thread() ? true : co_await async_run(); + if (inside) { + auto *sqe = _uring.get_sqe(); + io_uring_prep_write(sqe, socket.get(), buf, len, 0); + res = co_await wait_for_sqe(sqe); + } + co_return res; + } + Lazy schedule() override { + co_return co_await async_run(); + } + Detached consume_events() { + uint64_t value = 0; + REQUIRE(in_thread()); + int res = sizeof(value); + while (running() && (res == sizeof(value))) { + auto *sqe = _uring.get_sqe(); + io_uring_prep_read(sqe, _event.get(), &value, sizeof(value), 0); + res = co_await wait_for_sqe(sqe); + } + } + Detached async_shutdown() { + bool inside = in_thread() ? true : co_await async_run(); + REQUIRE(inside && "unable to initialize shutdown of internal thread"); + { + auto guard = protect(); + _thread_id = std::thread::id(); + } + } +}; + +void +IoUringThread::start() +{ + _thread = std::thread(&IoUringThread::main, this); + _thread_id.wait(std::thread::id()); +} + +struct IoUringThread::RunGuard { + IoUringThread &self; + RunGuard(IoUringThread &self_in) noexcept : self(self_in) { + self._thread_id = std::this_thread::get_id(); + self._thread_id.notify_all(); + self.consume_events(); + } + ~RunGuard() { + REQUIRE(self.stopped()); + self.wakeup(); + self.handle_queue(false); + self._uring.drain_pending(); + } +}; + +void +IoUringThread::main() +{ + RunGuard guard(*this); + while (running()) { + _uring.submit_and_dispatch(); + handle_queue(true); + } +} + +void +IoUringThread::init_shutdown() +{ + async_shutdown(); +} + +void +IoUringThread::fini_shutdown() +{ + _thread.join(); +} + +IoUringThread::~IoUringThread() +{ + REQUIRE(_todo.empty()); + REQUIRE(_queue.empty()); +} + +} // +} // vespalib::coro diff --git a/vespalib/src/vespa/vespalib/coro/waiting_for.h b/vespalib/src/vespa/vespalib/coro/waiting_for.h index 2e11a9cb38c..d82d0779f41 100644 --- a/vespalib/src/vespa/vespalib/coro/waiting_for.h +++ b/vespalib/src/vespa/vespalib/coro/waiting_for.h @@ -48,9 +48,6 @@ public: void set_done() { _state->result.set_done(); } - std::coroutine_handle<> release_waiter() { - return std::exchange(_state->waiter, std::noop_coroutine()); - } void *release() { return std::exchange(_state, nullptr); } @@ -61,6 +58,18 @@ public: static WaitingFor from_state(PromiseState &state) { return {&state}; } + // Unasking the question. This will drop the internal reference to + // the promise state and return the handle for the waiting + // coroutine. A function responsible for starting an async + // operation may chose to do 'wf.set_value()' followed by + // 'return wf.mu()' to convert the async operation to a sync + // operation by immediately resuming the waiting coroutine (by + // symmetrically transferring control to itself). + [[nodiscard]] std::coroutine_handle<> mu() { + auto handle = std::exchange(_state->waiter, std::noop_coroutine()); + _state = nullptr; + return handle; + } }; template @@ -74,35 +83,54 @@ WaitingFor::~WaitingFor() static_assert(receiver_of, int>); static_assert(receiver_of>, std::unique_ptr>); +// concept indicating that F is a function that starts an async +// operation with T as result. The result will eventually be delivered +// to the WaitingFor passed to the function. +template +concept start_async_op = requires(F &&f, std::decay_t cpy, WaitingFor wf) { + std::decay_t(std::forward(f)); + { cpy(std::move(wf)) } -> std::same_as; +}; + +// concept indicating that F is a function that starts an async +// operation with T as result. The result will eventually be delivered +// to the WaitingFor passed to the function. This variant will use +// symmetric transfer to switch to another coroutine as a side-effect +// of starting the async operation (and thus suspending the +// coroutine). This also enables the function to change the operation +// form async to sync by setting the value directly in the function +// and returning wf.mu() +template +concept maybe_start_async_op = requires(F &&f, std::decay_t cpy, WaitingFor wf) { + std::decay_t(std::forward(f)); + { cpy(std::move(wf)) } -> std::same_as>; +}; + // Create a custom awaiter that will return a value of type T when the -// coroutine is resumed. The waiting coroutine will be represented as -// a WaitingFor that is passed as the only parameter to 'f'. The -// return value of 'f' is returned from await_suspend, which means it -// must be void, bool or coroutine handle. If 'f' returns a value -// indicating that the coroutine should be resumed immediately, -// WaitingFor::release_waiter() must be called to avoid resume -// being called as well. Note that await_ready will always return -// false, since the coroutine needs to be suspended in order to create -// the WaitingFor object needed. Also, the WaitingFor api -// implies that the value will be set from the outside and thus cannot -// be ready up-front. Also note that await_resume must return T by -// value, since the awaiter containing the result is a temporary -// object. +// coroutine is resumed. The operation waited upon is represented by +// the function object used to start it. Note that await_ready will +// always return false, since the coroutine needs to be suspended in +// order to create the WaitingFor object needed. Also, the +// WaitingFor api implies that the value will be set from the +// outside and thus cannot be ready up-front. Also note that +// await_resume must return T by value, since the awaiter containing +// the result is a temporary object. template -auto awaiter_for(F &&f) { +requires start_async_op || maybe_start_async_op +auto wait_for(F &&on_suspend) { struct awaiter final : PromiseState { using PromiseState::result; using PromiseState::waiter; - std::decay_t fun; - awaiter(F &&f) : PromiseState(), fun(std::forward(f)) {} + std::decay_t on_suspend; + awaiter(F &&on_suspend_in) : PromiseState(), on_suspend(std::forward(on_suspend_in)) {} bool await_ready() const noexcept { return false; } T await_resume() { return std::move(result).get_value(); } decltype(auto) await_suspend(std::coroutine_handle<> handle) __attribute__((noinline)) { waiter = handle; - return fun(WaitingFor::from_state(*this)); + return on_suspend(WaitingFor::from_state(*this)); } }; - return awaiter(std::forward(f)); + return awaiter(std::forward(on_suspend)); } } diff --git a/vespalib/src/vespa/vespalib/net/socket_address.cpp b/vespalib/src/vespa/vespalib/net/socket_address.cpp index c1fb1cb0858..4c552b03de7 100644 --- a/vespalib/src/vespa/vespalib/net/socket_address.cpp +++ b/vespalib/src/vespa/vespalib/net/socket_address.cpp @@ -152,14 +152,21 @@ SocketAddress::spec() const } SocketHandle -SocketAddress::connect(const std::function &tweak) const +SocketAddress::raw_socket() const { if (valid()) { - SocketHandle handle(socket(_addr.ss_family, SOCK_STREAM, 0)); - if (handle.valid() && tweak(handle)) { - if ((::connect(handle.get(), addr(), _size) == 0) || (errno == EINPROGRESS)) { - return handle; - } + return SocketHandle(::socket(_addr.ss_family, SOCK_STREAM, 0)); + } + return SocketHandle(); +} + +SocketHandle +SocketAddress::connect(const std::function &tweak) const +{ + SocketHandle handle = raw_socket(); + if (handle.valid() && tweak(handle)) { + if ((::connect(handle.get(), addr(), _size) == 0) || (errno == EINPROGRESS)) { + return handle; } } return SocketHandle(); @@ -168,20 +175,18 @@ SocketAddress::connect(const std::function &tweak) const SocketHandle SocketAddress::listen(int backlog) const { - if (valid()) { - SocketHandle handle(socket(_addr.ss_family, SOCK_STREAM, 0)); - if (handle.valid()) { - if (is_ipv6()) { - handle.set_ipv6_only(false); - } - if (port() > 0) { - handle.set_reuse_addr(true); - } - if ((bind(handle.get(), addr(), _size) == 0) && - (::listen(handle.get(), backlog) == 0)) - { - return handle; - } + SocketHandle handle = raw_socket(); + if (handle.valid()) { + if (is_ipv6()) { + handle.set_ipv6_only(false); + } + if (port() > 0) { + handle.set_reuse_addr(true); + } + if ((bind(handle.get(), addr(), _size) == 0) && + (::listen(handle.get(), backlog) == 0)) + { + return handle; } } return SocketHandle(); diff --git a/vespalib/src/vespa/vespalib/net/socket_address.h b/vespalib/src/vespa/vespalib/net/socket_address.h index 6dfe843c12e..88da83d902b 100644 --- a/vespalib/src/vespa/vespalib/net/socket_address.h +++ b/vespalib/src/vespa/vespalib/net/socket_address.h @@ -35,6 +35,8 @@ public: memcpy(this, &rhs, sizeof(SocketAddress)); return *this; } + const sockaddr *raw_addr() const { return addr(); } + socklen_t raw_addr_len() const { return _size; } bool valid() const { return (_size >= sizeof(sa_family_t)); } bool is_ipv4() const { return (valid() && (_addr.ss_family == AF_INET)); } bool is_ipv6() const { return (valid() && (_addr.ss_family == AF_INET6)); } @@ -47,6 +49,7 @@ public: vespalib::string path() const; vespalib::string name() const; vespalib::string spec() const; + SocketHandle raw_socket() const; SocketHandle connect(const std::function &tweak) const; SocketHandle connect() const { return connect([](SocketHandle&) noexcept { return true; }); } SocketHandle connect_async() const { -- cgit v1.2.3