diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2019-03-15 12:19:56 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@broadpark.no> | 2019-03-15 13:26:48 +0100 |
commit | 8bf4079070df81a6b872365d20d48dbcecfba687 (patch) | |
tree | a4530c0d6ebe296f44a91469073850cc16d6d0bd /vespalib | |
parent | 2e8f039246b4b69303f2da48f37819a6c24ce887 (diff) |
Perform basic epoll emulation on Darwin.
Diffstat (limited to 'vespalib')
-rw-r--r-- | vespalib/src/tests/net/selector/selector_test.cpp | 4 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/CMakeLists.txt | 10 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/emulated_epoll.cpp | 108 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/emulated_epoll.h | 45 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/native_epoll.cpp | 65 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/native_epoll.h | 26 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/selector.cpp | 94 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/selector.h | 48 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/socket_utils.cpp | 39 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/socket_utils.h | 16 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/wakeup_pipe.cpp | 35 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/wakeup_pipe.h | 27 |
12 files changed, 379 insertions, 138 deletions
diff --git a/vespalib/src/tests/net/selector/selector_test.cpp b/vespalib/src/tests/net/selector/selector_test.cpp index 302de712ef8..7d7a147bcb2 100644 --- a/vespalib/src/tests/net/selector/selector_test.cpp +++ b/vespalib/src/tests/net/selector/selector_test.cpp @@ -2,7 +2,7 @@ #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/vespalib/net/socket_address.h> #include <vespa/vespalib/net/selector.h> -#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/net/socket_utils.h> #include <thread> #include <functional> #include <chrono> @@ -18,7 +18,7 @@ struct SocketPair { SocketPair &operator=(SocketPair &&) = default; static SocketPair create() { int sockets[2]; - ASSERT_EQUAL(0, socketpair(AF_UNIX, SOCK_STREAM | O_NONBLOCK, 0, sockets)); + socketutils::nonblocking_socketpair(AF_UNIX, SOCK_STREAM, 0, sockets); return SocketPair(sockets[0], sockets[1]); } ~SocketPair() {} diff --git a/vespalib/src/vespa/vespalib/net/CMakeLists.txt b/vespalib/src/vespa/vespalib/net/CMakeLists.txt index 53f66c240fc..a71da833e16 100644 --- a/vespalib/src/vespa/vespalib/net/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/net/CMakeLists.txt @@ -1,4 +1,11 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +if(CMAKE_HOST_SYSTEM_NAME STREQUAL "Darwin") + set(VESPA_EPOLL_FLAVOUR "emulated_epoll.cpp") +else() + set(VESPA_EPOLL_FLAVOUR "native_epoll.cpp") +endif() + vespa_add_library(vespalib_vespalib_net OBJECT SOURCES async_resolver.cpp @@ -11,6 +18,9 @@ vespa_add_library(vespalib_vespalib_net OBJECT socket_handle.cpp socket_options.cpp socket_spec.cpp + socket_utils.cpp sync_crypto_socket.cpp + wakeup_pipe.cpp + ${VESPA_EPOLL_FLAVOUR} DEPENDS ) diff --git a/vespalib/src/vespa/vespalib/net/emulated_epoll.cpp b/vespalib/src/vespa/vespalib/net/emulated_epoll.cpp new file mode 100644 index 00000000000..f4eb8ca3703 --- /dev/null +++ b/vespalib/src/vespa/vespalib/net/emulated_epoll.cpp @@ -0,0 +1,108 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "emulated_epoll.h" +#include <chrono> +#include <cstring> +#include <vector> + +namespace vespalib { + +Epoll::Epoll() + : _monitorlock(), + _wakeup(), + _monitored() +{ +} + +Epoll::~Epoll() = default; + +uint32_t maybe(uint32_t value, bool yes) { return yes ? value : 0; } + +void +Epoll::add(int fd, void *ctx, bool read, bool write) +{ + epoll_event evt; + evt.events = maybe(EPOLLIN, read) | maybe(EPOLLOUT, write); + evt.data.ptr = ctx; + std::lock_guard guard(_monitorlock); + _monitored[fd] = evt; + _wakeup.write_token(); +} + +void +Epoll::update(int fd, void *ctx, bool read, bool write) +{ + epoll_event evt; + evt.events = maybe(EPOLLIN, read) | maybe(EPOLLOUT, write); + evt.data.ptr = ctx; + std::lock_guard guard(_monitorlock); + _monitored[fd] = evt; + _wakeup.write_token(); +} + +void +Epoll::remove(int fd) +{ + epoll_event evt; + memset(&evt, 0, sizeof(evt)); + std::lock_guard guard(_monitorlock); + _monitored.erase(fd); + _wakeup.write_token(); +} + +size_t +Epoll::wait(epoll_event *events, size_t max_events, int timeout_ms) +{ + size_t evidx = 0; + std::vector<pollfd> fds; + bool allowRetry = true; + auto entryTime = std::chrono::steady_clock::now(); + int timeout_ms_initial = timeout_ms; + (void) timeout_ms_initial; + for (;evidx == 0 && allowRetry;) { + { + std::lock_guard guard(_monitorlock); + fds.resize(_monitored.size() + 1); + fds[0].fd = _wakeup.get_read_fd(); + fds[0].events = POLLIN; + fds[0].revents = 0; + size_t fdidx = 1; + for (const auto &mon : _monitored) { + fds[fdidx].fd = mon.first; + fds[fdidx].events = mon.second.events; + fds[fdidx].revents = 0; + ++fdidx; + } + } + allowRetry = false; + int res = poll(&fds[0], fds.size(), timeout_ms); + if (res > 0) { + std::lock_guard guard(_monitorlock); + if (fds[0].revents != 0) { // Internal epoll emulation wakeup + _wakeup.read_tokens(); + auto retryTime = std::chrono::steady_clock::now(); + auto delay = std::chrono::duration<double, std::milli>(retryTime - entryTime).count(); + if (delay < timeout_ms_initial) { + timeout_ms = timeout_ms_initial - 1 - delay; + allowRetry = true; // woken up by internal wakeup + } + } + for (size_t fdidx = 1; fdidx < fds.size() && evidx < max_events; ++fdidx) { + if (fds[fdidx].revents != 0) { + int fd = fds[fdidx].fd; + auto monitr = _monitored.find(fd); + if (monitr != _monitored.end()) { + events[evidx].events = fds[fdidx].revents; + events[evidx].data.ptr = monitr->second.data.ptr; + ++evidx; + } + } + } + } else { + return 0; // timeout + } + } + return evidx; +} + +} diff --git a/vespalib/src/vespa/vespalib/net/emulated_epoll.h b/vespalib/src/vespa/vespalib/net/emulated_epoll.h new file mode 100644 index 00000000000..8d21bdfedaa --- /dev/null +++ b/vespalib/src/vespa/vespalib/net/emulated_epoll.h @@ -0,0 +1,45 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "wakeup_pipe.h" +#include <poll.h> +#include <map> +#include <mutex> + +#define EPOLLERR POLLERR +#define EPOLLHUP POLLHUP +#define EPOLLIN POLLIN +#define EPOLLOUT POLLOUT + +namespace vespalib { + +// structure describing which event occurred. +struct epoll_event +{ + struct { + void *ptr; + } data; + uint32_t events; +}; + +/** + * The Epoll class is a thin wrapper around basic emulation of the epoll + * related system calls. + **/ +class Epoll +{ +private: + std::mutex _monitorlock; + WakeupPipe _wakeup; + std::map<int, epoll_event> _monitored; +public: + Epoll(); + ~Epoll(); + void add(int fd, void *ctx, bool read, bool write); + void update(int fd, void *ctx, bool read, bool write); + void remove(int fd); + size_t wait(epoll_event *events, size_t max_events, int timeout_ms); +}; + +} diff --git a/vespalib/src/vespa/vespalib/net/native_epoll.cpp b/vespalib/src/vespa/vespalib/net/native_epoll.cpp new file mode 100644 index 00000000000..7e3e62a5900 --- /dev/null +++ b/vespalib/src/vespa/vespalib/net/native_epoll.cpp @@ -0,0 +1,65 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "native_epoll.h" +#include <cassert> +#include <cerrno> +#include <unistd.h> +#include <vespa/log/log.h> + +namespace vespalib { + +uint32_t maybe(uint32_t value, bool yes) { return yes ? value : 0; } + +void check(int res) { + if (res == -1) { + if (errno == ENOMEM) { + LOG_ABORT("out of memory"); + } + } +} + +Epoll::Epoll() + : _epoll_fd(epoll_create1(0)) +{ + assert(_epoll_fd != -1); +} + +Epoll::~Epoll() +{ + close(_epoll_fd); +} + +void +Epoll::add(int fd, void *ctx, bool read, bool write) +{ + epoll_event evt; + evt.events = maybe(EPOLLIN, read) | maybe(EPOLLOUT, write); + evt.data.ptr = ctx; + check(epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &evt)); +} + +void +Epoll::update(int fd, void *ctx, bool read, bool write) +{ + epoll_event evt; + evt.events = maybe(EPOLLIN, read) | maybe(EPOLLOUT, write); + evt.data.ptr = ctx; + check(epoll_ctl(_epoll_fd, EPOLL_CTL_MOD, fd, &evt)); +} + +void +Epoll::remove(int fd) +{ + epoll_event evt; + memset(&evt, 0, sizeof(evt)); + check(epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, &evt)); +} + +size_t +Epoll::wait(epoll_event *events, size_t max_events, int timeout_ms) +{ + int res = epoll_wait(_epoll_fd, events, max_events, timeout_ms); + return std::max(res, 0); +} + +} diff --git a/vespalib/src/vespa/vespalib/net/native_epoll.h b/vespalib/src/vespa/vespalib/net/native_epoll.h new file mode 100644 index 00000000000..032c64a7787 --- /dev/null +++ b/vespalib/src/vespa/vespalib/net/native_epoll.h @@ -0,0 +1,26 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <sys/epoll.h> + +namespace vespalib { + +/** + * The Epoll class is a thin wrapper around the epoll related system + * calls. + **/ +class Epoll +{ +private: + int _epoll_fd; +public: + Epoll(); + ~Epoll(); + void add(int fd, void *ctx, bool read, bool write); + void update(int fd, void *ctx, bool read, bool write); + void remove(int fd); + size_t wait(epoll_event *events, size_t max_events, int timeout_ms); +}; + +} diff --git a/vespalib/src/vespa/vespalib/net/selector.cpp b/vespalib/src/vespa/vespalib/net/selector.cpp index 762849b81f0..0dcc852cf3a 100644 --- a/vespalib/src/vespa/vespalib/net/selector.cpp +++ b/vespalib/src/vespa/vespalib/net/selector.cpp @@ -1,13 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "selector.h" -#include <cassert> -#include <cstdlib> -#include <cerrno> -#include <cstring> -#include <unistd.h> -#include <fcntl.h> -#include <vespa/log/log.h> namespace vespalib { @@ -35,97 +28,10 @@ struct SingleFdHandler { } }; -//----------------------------------------------------------------------------- - -uint32_t maybe(uint32_t value, bool yes) { return yes ? value : 0; } - -void check(int res) { - if (res == -1) { - if (errno == ENOMEM) { - LOG_ABORT("out of memory"); - } - } -} - } // namespace vespalib::<unnamed> //----------------------------------------------------------------------------- -WakeupPipe::WakeupPipe() - : _pipe() -{ - int res = pipe2(_pipe, O_NONBLOCK); - assert(res == 0); -} - -WakeupPipe::~WakeupPipe() -{ - close(_pipe[0]); - close(_pipe[1]); -} - -void -WakeupPipe::write_token() -{ - char token = 'T'; - [[maybe_unused]] ssize_t res = write(_pipe[1], &token, 1); -} - -void -WakeupPipe::read_tokens() -{ - char token_trash[128]; - [[maybe_unused]] ssize_t res = read(_pipe[0], token_trash, sizeof(token_trash)); -} - -//----------------------------------------------------------------------------- - -Epoll::Epoll() - : _epoll_fd(epoll_create1(0)) -{ - assert(_epoll_fd != -1); -} - -Epoll::~Epoll() -{ - close(_epoll_fd); -} - -void -Epoll::add(int fd, void *ctx, bool read, bool write) -{ - epoll_event evt; - evt.events = maybe(EPOLLIN, read) | maybe(EPOLLOUT, write); - evt.data.ptr = ctx; - check(epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &evt)); -} - -void -Epoll::update(int fd, void *ctx, bool read, bool write) -{ - epoll_event evt; - evt.events = maybe(EPOLLIN, read) | maybe(EPOLLOUT, write); - evt.data.ptr = ctx; - check(epoll_ctl(_epoll_fd, EPOLL_CTL_MOD, fd, &evt)); -} - -void -Epoll::remove(int fd) -{ - epoll_event evt; - memset(&evt, 0, sizeof(evt)); - check(epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, &evt)); -} - -size_t -Epoll::wait(epoll_event *events, size_t max_events, int timeout_ms) -{ - int res = epoll_wait(_epoll_fd, events, max_events, timeout_ms); - return std::max(res, 0); -} - -//----------------------------------------------------------------------------- - SingleFdSelector::SingleFdSelector(int fd) : _fd(fd), _selector() diff --git a/vespalib/src/vespa/vespalib/net/selector.h b/vespalib/src/vespa/vespalib/net/selector.h index 8b9d1c5f5d8..6b1967ddcd9 100644 --- a/vespalib/src/vespa/vespalib/net/selector.h +++ b/vespalib/src/vespa/vespalib/net/selector.h @@ -2,52 +2,16 @@ #pragma once +#include "wakeup_pipe.h" +#ifdef __APPLE__ +#include "emulated_epoll.h" +#else +#include "native_epoll.h" +#endif #include <vector> -#include <sys/epoll.h> namespace vespalib { -//----------------------------------------------------------------------------- - -/** - * A wakeup pipe is a non-blocking pipe that is used to wake up a - * blocking call to epoll_wait. The pipe readability is part of the - * selection set and a wakeup is triggered by writing to the - * pipe. When a wakeup is detected, pending tokens will be read and - * discarded to avoid spurious wakeups in the future. - **/ -class WakeupPipe { -private: - int _pipe[2]; -public: - WakeupPipe(); - ~WakeupPipe(); - int get_read_fd() const { return _pipe[0]; } - void write_token(); - void read_tokens(); -}; - -//----------------------------------------------------------------------------- - -/** - * The Epoll class is a thin wrapper around the epoll related system - * calls. - **/ -class Epoll -{ -private: - int _epoll_fd; -public: - Epoll(); - ~Epoll(); - void add(int fd, void *ctx, bool read, bool write); - void update(int fd, void *ctx, bool read, bool write); - void remove(int fd); - size_t wait(epoll_event *events, size_t max_events, int timeout_ms); -}; - -//----------------------------------------------------------------------------- - /** * Simple class used to hold events extracted from a call to epoll_wait. **/ diff --git a/vespalib/src/vespa/vespalib/net/socket_utils.cpp b/vespalib/src/vespa/vespalib/net/socket_utils.cpp new file mode 100644 index 00000000000..658577e6f64 --- /dev/null +++ b/vespalib/src/vespa/vespalib/net/socket_utils.cpp @@ -0,0 +1,39 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "socket_utils.h" +#include <sys/socket.h> +#include <fcntl.h> +#include <unistd.h> +#include <cassert> + +namespace vespalib::socketutils { + +void set_blocking(int fd, bool blocking) +{ + int flags = fcntl(fd, F_GETFL, 0); + if (blocking) { + flags &= ~O_NONBLOCK; + } else { + flags |= O_NONBLOCK; + } + int res = fcntl(fd, F_SETFL, flags); + assert(res == 0); +} + +void nonblocking_pipe(int pipefd[2]) +{ + int res = pipe(pipefd); + assert(res == 0); + set_blocking(pipefd[0], false); + set_blocking(pipefd[1], false); +} + +void nonblocking_socketpair(int domain, int type, int protocol, int socketfd[2]) +{ + int res = socketpair(domain, type, protocol, socketfd); + assert(res == 0); + set_blocking(socketfd[0], false); + set_blocking(socketfd[1], false); +} + +} diff --git a/vespalib/src/vespa/vespalib/net/socket_utils.h b/vespalib/src/vespa/vespalib/net/socket_utils.h new file mode 100644 index 00000000000..f01967d6da3 --- /dev/null +++ b/vespalib/src/vespa/vespalib/net/socket_utils.h @@ -0,0 +1,16 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +namespace vespalib::socketutils { + +// Set blocking mode on file descriptor +void set_blocking(int fd, bool blocking); + +// Create a pipe and set it nonblocking +void nonblocking_pipe(int pipefd[2]); + +// Create a socket pair and set it nonblocking +void nonblocking_socketpair(int domain, int type, int protocol, int socketfd[2]); + +} diff --git a/vespalib/src/vespa/vespalib/net/wakeup_pipe.cpp b/vespalib/src/vespa/vespalib/net/wakeup_pipe.cpp new file mode 100644 index 00000000000..ffae14c7499 --- /dev/null +++ b/vespalib/src/vespa/vespalib/net/wakeup_pipe.cpp @@ -0,0 +1,35 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "wakeup_pipe.h" +#include "socket_utils.h" +#include <unistd.h> + +namespace vespalib { + +WakeupPipe::WakeupPipe() + : _pipe() +{ + socketutils::nonblocking_pipe(_pipe); +} + +WakeupPipe::~WakeupPipe() +{ + close(_pipe[0]); + close(_pipe[1]); +} + +void +WakeupPipe::write_token() +{ + char token = 'T'; + [[maybe_unused]] ssize_t res = write(_pipe[1], &token, 1); +} + +void +WakeupPipe::read_tokens() +{ + char token_trash[128]; + [[maybe_unused]] ssize_t res = read(_pipe[0], token_trash, sizeof(token_trash)); +} + +} diff --git a/vespalib/src/vespa/vespalib/net/wakeup_pipe.h b/vespalib/src/vespa/vespalib/net/wakeup_pipe.h new file mode 100644 index 00000000000..5c729e4d408 --- /dev/null +++ b/vespalib/src/vespa/vespalib/net/wakeup_pipe.h @@ -0,0 +1,27 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +namespace vespalib { + +//----------------------------------------------------------------------------- + +/** + * A wakeup pipe is a non-blocking pipe that is used to wake up a + * blocking call to epoll_wait. The pipe readability is part of the + * selection set and a wakeup is triggered by writing to the + * pipe. When a wakeup is detected, pending tokens will be read and + * discarded to avoid spurious wakeups in the future. + **/ +class WakeupPipe { +private: + int _pipe[2]; +public: + WakeupPipe(); + ~WakeupPipe(); + int get_read_fd() const { return _pipe[0]; } + void write_token(); + void read_tokens(); +}; + +} |