summaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2019-03-15 12:19:56 +0100
committerTor Egge <Tor.Egge@broadpark.no>2019-03-15 13:26:48 +0100
commit8bf4079070df81a6b872365d20d48dbcecfba687 (patch)
treea4530c0d6ebe296f44a91469073850cc16d6d0bd /vespalib
parent2e8f039246b4b69303f2da48f37819a6c24ce887 (diff)
Perform basic epoll emulation on Darwin.
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/tests/net/selector/selector_test.cpp4
-rw-r--r--vespalib/src/vespa/vespalib/net/CMakeLists.txt10
-rw-r--r--vespalib/src/vespa/vespalib/net/emulated_epoll.cpp108
-rw-r--r--vespalib/src/vespa/vespalib/net/emulated_epoll.h45
-rw-r--r--vespalib/src/vespa/vespalib/net/native_epoll.cpp65
-rw-r--r--vespalib/src/vespa/vespalib/net/native_epoll.h26
-rw-r--r--vespalib/src/vespa/vespalib/net/selector.cpp94
-rw-r--r--vespalib/src/vespa/vespalib/net/selector.h48
-rw-r--r--vespalib/src/vespa/vespalib/net/socket_utils.cpp39
-rw-r--r--vespalib/src/vespa/vespalib/net/socket_utils.h16
-rw-r--r--vespalib/src/vespa/vespalib/net/wakeup_pipe.cpp35
-rw-r--r--vespalib/src/vespa/vespalib/net/wakeup_pipe.h27
12 files changed, 379 insertions, 138 deletions
diff --git a/vespalib/src/tests/net/selector/selector_test.cpp b/vespalib/src/tests/net/selector/selector_test.cpp
index 302de712ef8..7d7a147bcb2 100644
--- a/vespalib/src/tests/net/selector/selector_test.cpp
+++ b/vespalib/src/tests/net/selector/selector_test.cpp
@@ -2,7 +2,7 @@
#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/vespalib/net/socket_address.h>
#include <vespa/vespalib/net/selector.h>
-#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/net/socket_utils.h>
#include <thread>
#include <functional>
#include <chrono>
@@ -18,7 +18,7 @@ struct SocketPair {
SocketPair &operator=(SocketPair &&) = default;
static SocketPair create() {
int sockets[2];
- ASSERT_EQUAL(0, socketpair(AF_UNIX, SOCK_STREAM | O_NONBLOCK, 0, sockets));
+ socketutils::nonblocking_socketpair(AF_UNIX, SOCK_STREAM, 0, sockets);
return SocketPair(sockets[0], sockets[1]);
}
~SocketPair() {}
diff --git a/vespalib/src/vespa/vespalib/net/CMakeLists.txt b/vespalib/src/vespa/vespalib/net/CMakeLists.txt
index 53f66c240fc..a71da833e16 100644
--- a/vespalib/src/vespa/vespalib/net/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/net/CMakeLists.txt
@@ -1,4 +1,11 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+if(CMAKE_HOST_SYSTEM_NAME STREQUAL "Darwin")
+ set(VESPA_EPOLL_FLAVOUR "emulated_epoll.cpp")
+else()
+ set(VESPA_EPOLL_FLAVOUR "native_epoll.cpp")
+endif()
+
vespa_add_library(vespalib_vespalib_net OBJECT
SOURCES
async_resolver.cpp
@@ -11,6 +18,9 @@ vespa_add_library(vespalib_vespalib_net OBJECT
socket_handle.cpp
socket_options.cpp
socket_spec.cpp
+ socket_utils.cpp
sync_crypto_socket.cpp
+ wakeup_pipe.cpp
+ ${VESPA_EPOLL_FLAVOUR}
DEPENDS
)
diff --git a/vespalib/src/vespa/vespalib/net/emulated_epoll.cpp b/vespalib/src/vespa/vespalib/net/emulated_epoll.cpp
new file mode 100644
index 00000000000..f4eb8ca3703
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/net/emulated_epoll.cpp
@@ -0,0 +1,108 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "emulated_epoll.h"
+#include <chrono>
+#include <cstring>
+#include <vector>
+
+namespace vespalib {
+
+Epoll::Epoll()
+ : _monitorlock(),
+ _wakeup(),
+ _monitored()
+{
+}
+
+Epoll::~Epoll() = default;
+
+uint32_t maybe(uint32_t value, bool yes) { return yes ? value : 0; }
+
+void
+Epoll::add(int fd, void *ctx, bool read, bool write)
+{
+ epoll_event evt;
+ evt.events = maybe(EPOLLIN, read) | maybe(EPOLLOUT, write);
+ evt.data.ptr = ctx;
+ std::lock_guard guard(_monitorlock);
+ _monitored[fd] = evt;
+ _wakeup.write_token();
+}
+
+void
+Epoll::update(int fd, void *ctx, bool read, bool write)
+{
+ epoll_event evt;
+ evt.events = maybe(EPOLLIN, read) | maybe(EPOLLOUT, write);
+ evt.data.ptr = ctx;
+ std::lock_guard guard(_monitorlock);
+ _monitored[fd] = evt;
+ _wakeup.write_token();
+}
+
+void
+Epoll::remove(int fd)
+{
+ epoll_event evt;
+ memset(&evt, 0, sizeof(evt));
+ std::lock_guard guard(_monitorlock);
+ _monitored.erase(fd);
+ _wakeup.write_token();
+}
+
+size_t
+Epoll::wait(epoll_event *events, size_t max_events, int timeout_ms)
+{
+ size_t evidx = 0;
+ std::vector<pollfd> fds;
+ bool allowRetry = true;
+ auto entryTime = std::chrono::steady_clock::now();
+ int timeout_ms_initial = timeout_ms;
+ (void) timeout_ms_initial;
+ for (;evidx == 0 && allowRetry;) {
+ {
+ std::lock_guard guard(_monitorlock);
+ fds.resize(_monitored.size() + 1);
+ fds[0].fd = _wakeup.get_read_fd();
+ fds[0].events = POLLIN;
+ fds[0].revents = 0;
+ size_t fdidx = 1;
+ for (const auto &mon : _monitored) {
+ fds[fdidx].fd = mon.first;
+ fds[fdidx].events = mon.second.events;
+ fds[fdidx].revents = 0;
+ ++fdidx;
+ }
+ }
+ allowRetry = false;
+ int res = poll(&fds[0], fds.size(), timeout_ms);
+ if (res > 0) {
+ std::lock_guard guard(_monitorlock);
+ if (fds[0].revents != 0) { // Internal epoll emulation wakeup
+ _wakeup.read_tokens();
+ auto retryTime = std::chrono::steady_clock::now();
+ auto delay = std::chrono::duration<double, std::milli>(retryTime - entryTime).count();
+ if (delay < timeout_ms_initial) {
+ timeout_ms = timeout_ms_initial - 1 - delay;
+ allowRetry = true; // woken up by internal wakeup
+ }
+ }
+ for (size_t fdidx = 1; fdidx < fds.size() && evidx < max_events; ++fdidx) {
+ if (fds[fdidx].revents != 0) {
+ int fd = fds[fdidx].fd;
+ auto monitr = _monitored.find(fd);
+ if (monitr != _monitored.end()) {
+ events[evidx].events = fds[fdidx].revents;
+ events[evidx].data.ptr = monitr->second.data.ptr;
+ ++evidx;
+ }
+ }
+ }
+ } else {
+ return 0; // timeout
+ }
+ }
+ return evidx;
+}
+
+}
diff --git a/vespalib/src/vespa/vespalib/net/emulated_epoll.h b/vespalib/src/vespa/vespalib/net/emulated_epoll.h
new file mode 100644
index 00000000000..8d21bdfedaa
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/net/emulated_epoll.h
@@ -0,0 +1,45 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "wakeup_pipe.h"
+#include <poll.h>
+#include <map>
+#include <mutex>
+
+#define EPOLLERR POLLERR
+#define EPOLLHUP POLLHUP
+#define EPOLLIN POLLIN
+#define EPOLLOUT POLLOUT
+
+namespace vespalib {
+
+// structure describing which event occurred.
+struct epoll_event
+{
+ struct {
+ void *ptr;
+ } data;
+ uint32_t events;
+};
+
+/**
+ * The Epoll class is a thin wrapper around basic emulation of the epoll
+ * related system calls.
+ **/
+class Epoll
+{
+private:
+ std::mutex _monitorlock;
+ WakeupPipe _wakeup;
+ std::map<int, epoll_event> _monitored;
+public:
+ Epoll();
+ ~Epoll();
+ void add(int fd, void *ctx, bool read, bool write);
+ void update(int fd, void *ctx, bool read, bool write);
+ void remove(int fd);
+ size_t wait(epoll_event *events, size_t max_events, int timeout_ms);
+};
+
+}
diff --git a/vespalib/src/vespa/vespalib/net/native_epoll.cpp b/vespalib/src/vespa/vespalib/net/native_epoll.cpp
new file mode 100644
index 00000000000..7e3e62a5900
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/net/native_epoll.cpp
@@ -0,0 +1,65 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "native_epoll.h"
+#include <cassert>
+#include <cerrno>
+#include <unistd.h>
+#include <vespa/log/log.h>
+
+namespace vespalib {
+
+uint32_t maybe(uint32_t value, bool yes) { return yes ? value : 0; }
+
+void check(int res) {
+ if (res == -1) {
+ if (errno == ENOMEM) {
+ LOG_ABORT("out of memory");
+ }
+ }
+}
+
+Epoll::Epoll()
+ : _epoll_fd(epoll_create1(0))
+{
+ assert(_epoll_fd != -1);
+}
+
+Epoll::~Epoll()
+{
+ close(_epoll_fd);
+}
+
+void
+Epoll::add(int fd, void *ctx, bool read, bool write)
+{
+ epoll_event evt;
+ evt.events = maybe(EPOLLIN, read) | maybe(EPOLLOUT, write);
+ evt.data.ptr = ctx;
+ check(epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &evt));
+}
+
+void
+Epoll::update(int fd, void *ctx, bool read, bool write)
+{
+ epoll_event evt;
+ evt.events = maybe(EPOLLIN, read) | maybe(EPOLLOUT, write);
+ evt.data.ptr = ctx;
+ check(epoll_ctl(_epoll_fd, EPOLL_CTL_MOD, fd, &evt));
+}
+
+void
+Epoll::remove(int fd)
+{
+ epoll_event evt;
+ memset(&evt, 0, sizeof(evt));
+ check(epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, &evt));
+}
+
+size_t
+Epoll::wait(epoll_event *events, size_t max_events, int timeout_ms)
+{
+ int res = epoll_wait(_epoll_fd, events, max_events, timeout_ms);
+ return std::max(res, 0);
+}
+
+}
diff --git a/vespalib/src/vespa/vespalib/net/native_epoll.h b/vespalib/src/vespa/vespalib/net/native_epoll.h
new file mode 100644
index 00000000000..032c64a7787
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/net/native_epoll.h
@@ -0,0 +1,26 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <sys/epoll.h>
+
+namespace vespalib {
+
+/**
+ * The Epoll class is a thin wrapper around the epoll related system
+ * calls.
+ **/
+class Epoll
+{
+private:
+ int _epoll_fd;
+public:
+ Epoll();
+ ~Epoll();
+ void add(int fd, void *ctx, bool read, bool write);
+ void update(int fd, void *ctx, bool read, bool write);
+ void remove(int fd);
+ size_t wait(epoll_event *events, size_t max_events, int timeout_ms);
+};
+
+}
diff --git a/vespalib/src/vespa/vespalib/net/selector.cpp b/vespalib/src/vespa/vespalib/net/selector.cpp
index 762849b81f0..0dcc852cf3a 100644
--- a/vespalib/src/vespa/vespalib/net/selector.cpp
+++ b/vespalib/src/vespa/vespalib/net/selector.cpp
@@ -1,13 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "selector.h"
-#include <cassert>
-#include <cstdlib>
-#include <cerrno>
-#include <cstring>
-#include <unistd.h>
-#include <fcntl.h>
-#include <vespa/log/log.h>
namespace vespalib {
@@ -35,97 +28,10 @@ struct SingleFdHandler {
}
};
-//-----------------------------------------------------------------------------
-
-uint32_t maybe(uint32_t value, bool yes) { return yes ? value : 0; }
-
-void check(int res) {
- if (res == -1) {
- if (errno == ENOMEM) {
- LOG_ABORT("out of memory");
- }
- }
-}
-
} // namespace vespalib::<unnamed>
//-----------------------------------------------------------------------------
-WakeupPipe::WakeupPipe()
- : _pipe()
-{
- int res = pipe2(_pipe, O_NONBLOCK);
- assert(res == 0);
-}
-
-WakeupPipe::~WakeupPipe()
-{
- close(_pipe[0]);
- close(_pipe[1]);
-}
-
-void
-WakeupPipe::write_token()
-{
- char token = 'T';
- [[maybe_unused]] ssize_t res = write(_pipe[1], &token, 1);
-}
-
-void
-WakeupPipe::read_tokens()
-{
- char token_trash[128];
- [[maybe_unused]] ssize_t res = read(_pipe[0], token_trash, sizeof(token_trash));
-}
-
-//-----------------------------------------------------------------------------
-
-Epoll::Epoll()
- : _epoll_fd(epoll_create1(0))
-{
- assert(_epoll_fd != -1);
-}
-
-Epoll::~Epoll()
-{
- close(_epoll_fd);
-}
-
-void
-Epoll::add(int fd, void *ctx, bool read, bool write)
-{
- epoll_event evt;
- evt.events = maybe(EPOLLIN, read) | maybe(EPOLLOUT, write);
- evt.data.ptr = ctx;
- check(epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &evt));
-}
-
-void
-Epoll::update(int fd, void *ctx, bool read, bool write)
-{
- epoll_event evt;
- evt.events = maybe(EPOLLIN, read) | maybe(EPOLLOUT, write);
- evt.data.ptr = ctx;
- check(epoll_ctl(_epoll_fd, EPOLL_CTL_MOD, fd, &evt));
-}
-
-void
-Epoll::remove(int fd)
-{
- epoll_event evt;
- memset(&evt, 0, sizeof(evt));
- check(epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, &evt));
-}
-
-size_t
-Epoll::wait(epoll_event *events, size_t max_events, int timeout_ms)
-{
- int res = epoll_wait(_epoll_fd, events, max_events, timeout_ms);
- return std::max(res, 0);
-}
-
-//-----------------------------------------------------------------------------
-
SingleFdSelector::SingleFdSelector(int fd)
: _fd(fd),
_selector()
diff --git a/vespalib/src/vespa/vespalib/net/selector.h b/vespalib/src/vespa/vespalib/net/selector.h
index 8b9d1c5f5d8..6b1967ddcd9 100644
--- a/vespalib/src/vespa/vespalib/net/selector.h
+++ b/vespalib/src/vespa/vespalib/net/selector.h
@@ -2,52 +2,16 @@
#pragma once
+#include "wakeup_pipe.h"
+#ifdef __APPLE__
+#include "emulated_epoll.h"
+#else
+#include "native_epoll.h"
+#endif
#include <vector>
-#include <sys/epoll.h>
namespace vespalib {
-//-----------------------------------------------------------------------------
-
-/**
- * A wakeup pipe is a non-blocking pipe that is used to wake up a
- * blocking call to epoll_wait. The pipe readability is part of the
- * selection set and a wakeup is triggered by writing to the
- * pipe. When a wakeup is detected, pending tokens will be read and
- * discarded to avoid spurious wakeups in the future.
- **/
-class WakeupPipe {
-private:
- int _pipe[2];
-public:
- WakeupPipe();
- ~WakeupPipe();
- int get_read_fd() const { return _pipe[0]; }
- void write_token();
- void read_tokens();
-};
-
-//-----------------------------------------------------------------------------
-
-/**
- * The Epoll class is a thin wrapper around the epoll related system
- * calls.
- **/
-class Epoll
-{
-private:
- int _epoll_fd;
-public:
- Epoll();
- ~Epoll();
- void add(int fd, void *ctx, bool read, bool write);
- void update(int fd, void *ctx, bool read, bool write);
- void remove(int fd);
- size_t wait(epoll_event *events, size_t max_events, int timeout_ms);
-};
-
-//-----------------------------------------------------------------------------
-
/**
* Simple class used to hold events extracted from a call to epoll_wait.
**/
diff --git a/vespalib/src/vespa/vespalib/net/socket_utils.cpp b/vespalib/src/vespa/vespalib/net/socket_utils.cpp
new file mode 100644
index 00000000000..658577e6f64
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/net/socket_utils.cpp
@@ -0,0 +1,39 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "socket_utils.h"
+#include <sys/socket.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <cassert>
+
+namespace vespalib::socketutils {
+
+void set_blocking(int fd, bool blocking)
+{
+ int flags = fcntl(fd, F_GETFL, 0);
+ if (blocking) {
+ flags &= ~O_NONBLOCK;
+ } else {
+ flags |= O_NONBLOCK;
+ }
+ int res = fcntl(fd, F_SETFL, flags);
+ assert(res == 0);
+}
+
+void nonblocking_pipe(int pipefd[2])
+{
+ int res = pipe(pipefd);
+ assert(res == 0);
+ set_blocking(pipefd[0], false);
+ set_blocking(pipefd[1], false);
+}
+
+void nonblocking_socketpair(int domain, int type, int protocol, int socketfd[2])
+{
+ int res = socketpair(domain, type, protocol, socketfd);
+ assert(res == 0);
+ set_blocking(socketfd[0], false);
+ set_blocking(socketfd[1], false);
+}
+
+}
diff --git a/vespalib/src/vespa/vespalib/net/socket_utils.h b/vespalib/src/vespa/vespalib/net/socket_utils.h
new file mode 100644
index 00000000000..f01967d6da3
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/net/socket_utils.h
@@ -0,0 +1,16 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+namespace vespalib::socketutils {
+
+// Set blocking mode on file descriptor
+void set_blocking(int fd, bool blocking);
+
+// Create a pipe and set it nonblocking
+void nonblocking_pipe(int pipefd[2]);
+
+// Create a socket pair and set it nonblocking
+void nonblocking_socketpair(int domain, int type, int protocol, int socketfd[2]);
+
+}
diff --git a/vespalib/src/vespa/vespalib/net/wakeup_pipe.cpp b/vespalib/src/vespa/vespalib/net/wakeup_pipe.cpp
new file mode 100644
index 00000000000..ffae14c7499
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/net/wakeup_pipe.cpp
@@ -0,0 +1,35 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "wakeup_pipe.h"
+#include "socket_utils.h"
+#include <unistd.h>
+
+namespace vespalib {
+
+WakeupPipe::WakeupPipe()
+ : _pipe()
+{
+ socketutils::nonblocking_pipe(_pipe);
+}
+
+WakeupPipe::~WakeupPipe()
+{
+ close(_pipe[0]);
+ close(_pipe[1]);
+}
+
+void
+WakeupPipe::write_token()
+{
+ char token = 'T';
+ [[maybe_unused]] ssize_t res = write(_pipe[1], &token, 1);
+}
+
+void
+WakeupPipe::read_tokens()
+{
+ char token_trash[128];
+ [[maybe_unused]] ssize_t res = read(_pipe[0], token_trash, sizeof(token_trash));
+}
+
+}
diff --git a/vespalib/src/vespa/vespalib/net/wakeup_pipe.h b/vespalib/src/vespa/vespalib/net/wakeup_pipe.h
new file mode 100644
index 00000000000..5c729e4d408
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/net/wakeup_pipe.h
@@ -0,0 +1,27 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+namespace vespalib {
+
+//-----------------------------------------------------------------------------
+
+/**
+ * A wakeup pipe is a non-blocking pipe that is used to wake up a
+ * blocking call to epoll_wait. The pipe readability is part of the
+ * selection set and a wakeup is triggered by writing to the
+ * pipe. When a wakeup is detected, pending tokens will be read and
+ * discarded to avoid spurious wakeups in the future.
+ **/
+class WakeupPipe {
+private:
+ int _pipe[2];
+public:
+ WakeupPipe();
+ ~WakeupPipe();
+ int get_read_fd() const { return _pipe[0]; }
+ void write_token();
+ void read_tokens();
+};
+
+}