summaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorHaavard <havardpe@yahoo-inc.com>2017-03-09 12:03:25 +0000
committerHaavard <havardpe@yahoo-inc.com>2017-04-04 12:58:34 +0000
commitd7cb442bd8132aa67f54f1874e4a3c46422d34ff (patch)
tree1be035f6a838697d17a2b417d76f10e5557fbae4 /vespalib
parentedd8622348ed6799b47eb318117002aa88878fcd (diff)
epoll selector with test
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/CMakeLists.txt1
-rw-r--r--vespalib/src/tests/net/selector/CMakeLists.txt8
-rw-r--r--vespalib/src/tests/net/selector/selector_test.cpp177
-rw-r--r--vespalib/src/vespa/vespalib/net/CMakeLists.txt1
-rw-r--r--vespalib/src/vespa/vespalib/net/selector.cpp101
-rw-r--r--vespalib/src/vespa/vespalib/net/selector.h113
6 files changed, 401 insertions, 0 deletions
diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt
index b775cce68f2..39e56a5f97f 100644
--- a/vespalib/CMakeLists.txt
+++ b/vespalib/CMakeLists.txt
@@ -47,6 +47,7 @@ vespa_define_module(
src/tests/linkedptr
src/tests/make_fixture_macros
src/tests/memory
+ src/tests/net/selector
src/tests/net/socket
src/tests/objects/nbostream
src/tests/optimized
diff --git a/vespalib/src/tests/net/selector/CMakeLists.txt b/vespalib/src/tests/net/selector/CMakeLists.txt
new file mode 100644
index 00000000000..85e7e818920
--- /dev/null
+++ b/vespalib/src/tests/net/selector/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(vespalib_selector_test_app TEST
+ SOURCES
+ selector_test.cpp
+ DEPENDS
+ vespalib
+)
+vespa_add_test(NAME vespalib_selector_test_app COMMAND vespalib_selector_test_app)
diff --git a/vespalib/src/tests/net/selector/selector_test.cpp b/vespalib/src/tests/net/selector/selector_test.cpp
new file mode 100644
index 00000000000..aeb5a848262
--- /dev/null
+++ b/vespalib/src/tests/net/selector/selector_test.cpp
@@ -0,0 +1,177 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/vespalib/net/socket_address.h>
+#include <vespa/vespalib/net/selector.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <thread>
+#include <functional>
+#include <chrono>
+
+using namespace vespalib;
+
+struct SocketPair {
+ SocketHandle a;
+ SocketHandle b;
+ SocketPair(int a_fd, int b_fd) : a(a_fd), b(b_fd) {}
+ SocketPair(SocketPair &&) = default;
+ SocketPair &operator=(SocketPair &&) = default;
+ static SocketPair create() {
+ int sockets[2];
+ ASSERT_EQUAL(0, socketpair(AF_UNIX, SOCK_STREAM | O_NONBLOCK, 0, sockets));
+ return SocketPair(sockets[0], sockets[1]);
+ }
+ ~SocketPair() {}
+};
+
+struct Context {
+ int fd;
+ bool can_read;
+ bool can_write;
+ Context(int fd_in) : fd(fd_in), can_read(false), can_write(false) {}
+ void reset() {
+ can_read = false;
+ can_write = false;
+ }
+};
+
+struct Handler {
+ bool wakeup;
+ using context_type = Context;
+ int get_fd(Context &ctx) const { return ctx.fd; }
+ void handle_wakeup() { wakeup = true; }
+ void handle_event(Context &ctx, bool read, bool write) {
+ ctx.can_read = read;
+ ctx.can_write = write;
+ }
+ void reset() {
+ wakeup = false;
+ }
+};
+
+struct Fixture {
+ Handler handler;
+ Selector<Handler> selector;
+ std::vector<SocketPair> sockets;
+ std::vector<Context> contexts;
+ Fixture(size_t size) : handler(), selector(handler, 1024), sockets(), contexts() {
+ for (size_t i = 0; i < size; ++i) {
+ sockets.push_back(SocketPair::create());
+ contexts.push_back(Context(sockets.back().a.get()));
+ }
+ for (auto &ctx: contexts) {
+ selector.add(ctx);
+ }
+ }
+ Fixture &reset() {
+ handler.reset();
+ for (auto &ctx: contexts) {
+ ctx.reset();
+ }
+ return *this;
+ }
+ Fixture &poll(int timeout_ms = 250000) {
+ selector.poll(timeout_ms);
+ selector.dispatch();
+ return *this;
+ }
+ void verify(bool expect_wakeup, std::vector<std::pair<bool,bool> > expect_events) {
+ EXPECT_EQUAL(expect_wakeup, handler.wakeup);
+ ASSERT_EQUAL(expect_events.size(), contexts.size());
+ for (size_t i = 0; i < expect_events.size(); ++i) {
+ EXPECT_EQUAL(expect_events[i].first, contexts[i].can_read);
+ EXPECT_EQUAL(expect_events[i].second, contexts[i].can_write);
+ }
+ }
+};
+
+constexpr std::pair<bool,bool> none = std::make_pair(false, false);
+constexpr std::pair<bool,bool> in = std::make_pair(true, false);
+constexpr std::pair<bool,bool> out = std::make_pair(false, true);
+constexpr std::pair<bool,bool> both = std::make_pair(true, true);
+
+TEST_F("require that basic events trigger correctly", Fixture(1)) {
+ TEST_DO(f1.reset().poll().verify(false, {out}));
+ EXPECT_EQUAL(write(f1.sockets[0].b.get(), "test", 4), 4);
+ TEST_DO(f1.reset().poll().verify(false, {both}));
+ f1.selector.disable_write(f1.contexts[0]);
+ TEST_DO(f1.reset().poll().verify(false, {in}));
+ f1.selector.enable_write(f1.contexts[0]);
+ TEST_DO(f1.reset().poll().verify(false, {both}));
+ f1.selector.wakeup();
+ TEST_DO(f1.reset().poll().verify(true, {both}));
+ TEST_DO(f1.reset().poll().verify(false, {both}));
+}
+
+TEST_F("require that multiple sources can be selected on", Fixture(5)) {
+ char buf[128];
+ for (auto &ctx: f1.contexts) {
+ f1.selector.disable_write(ctx);
+ }
+ TEST_DO(f1.reset().poll(10).verify(false, {none, none, none, none, none}));
+ EXPECT_EQUAL(write(f1.sockets[1].b.get(), "test", 4), 4);
+ EXPECT_EQUAL(write(f1.sockets[3].b.get(), "test", 4), 4);
+ TEST_DO(f1.reset().poll().verify(false, {none, in, none, in, none}));
+ EXPECT_EQUAL(read(f1.sockets[1].a.get(), buf, sizeof(buf)), 4);
+ EXPECT_EQUAL(read(f1.sockets[3].a.get(), buf, 2), 2);
+ TEST_DO(f1.reset().poll().verify(false, {none, none, none, in, none}));
+ EXPECT_EQUAL(read(f1.sockets[3].a.get(), buf, sizeof(buf)), 2);
+ TEST_DO(f1.reset().poll(10).verify(false, {none, none, none, none, none}));
+}
+
+TEST_F("require that removed sources no longer produce events", Fixture(2)) {
+ TEST_DO(f1.reset().poll().verify(false, {out, out}));
+ EXPECT_EQUAL(write(f1.sockets[0].b.get(), "test", 4), 4);
+ EXPECT_EQUAL(write(f1.sockets[1].b.get(), "test", 4), 4);
+ TEST_DO(f1.reset().poll().verify(false, {both, both}));
+ f1.selector.remove(f1.contexts[0]);
+ TEST_DO(f1.reset().poll().verify(false, {none, both}));
+}
+
+TEST_F("require that filling the output buffer disables write events", Fixture(1)) {
+ EXPECT_EQUAL(write(f1.sockets[0].b.get(), "test", 4), 4);
+ TEST_DO(f1.reset().poll().verify(false, {both}));
+ size_t buffer_size = 0;
+ while (write(f1.sockets[0].a.get(), "x", 1) == 1) {
+ ++buffer_size;
+ }
+ EXPECT_EQUAL(errno, EWOULDBLOCK);
+ fprintf(stderr, "buffer size: %zu\n", buffer_size);
+ TEST_DO(f1.reset().poll().verify(false, {in}));
+}
+
+TEST_MT_FF("require that selector can be woken while waiting for events", 2, Fixture(0), TimeBomb(60)) {
+ if (thread_id == 0) {
+ TEST_DO(f1.reset().poll().verify(true, {}));
+ } else {
+ std::this_thread::sleep_for(std::chrono::milliseconds(20));
+ f1.selector.wakeup();
+ }
+}
+
+TEST_MT_FF("require that selection criteria can be changed while waiting for events", 2, Fixture(1), TimeBomb(60)) {
+ if (thread_id == 0) {
+ f1.selector.disable_write(f1.contexts[0]);
+ TEST_BARRIER();
+ TEST_DO(f1.reset().poll().verify(false, {out}));
+ } else {
+ TEST_BARRIER();
+ std::this_thread::sleep_for(std::chrono::milliseconds(20));
+ f1.selector.enable_write(f1.contexts[0]);
+ }
+}
+
+TEST_MT_FF("require that selection sources can be added while waiting for events", 2, Fixture(0), TimeBomb(60)) {
+ if (thread_id == 0) {
+ TEST_DO(f1.reset().poll().verify(false, {}));
+ TEST_BARRIER();
+ } else {
+ SocketPair pair = SocketPair::create();
+ Context ctx(pair.a.get());
+ std::this_thread::sleep_for(std::chrono::milliseconds(20));
+ f1.selector.add(ctx);
+ TEST_BARRIER();
+ EXPECT_TRUE(ctx.can_write);
+ }
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/vespa/vespalib/net/CMakeLists.txt b/vespalib/src/vespa/vespalib/net/CMakeLists.txt
index 05eb0879152..40b8b656db7 100644
--- a/vespalib/src/vespa/vespalib/net/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/net/CMakeLists.txt
@@ -1,6 +1,7 @@
# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(vespalib_vespalib_net OBJECT
SOURCES
+ selector.cpp
server_socket.cpp
socket.cpp
socket_address.cpp
diff --git a/vespalib/src/vespa/vespalib/net/selector.cpp b/vespalib/src/vespa/vespalib/net/selector.cpp
new file mode 100644
index 00000000000..01027e5b762
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/net/selector.cpp
@@ -0,0 +1,101 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include "selector.h"
+
+namespace vespalib {
+
+namespace {
+
+//-----------------------------------------------------------------------------
+
+uint32_t maybe(uint32_t value, bool yes) { return yes ? value : 0; }
+
+void check(int res) {
+ if (res == -1) {
+ if (errno == ENOMEM) {
+ abort();
+ }
+ }
+}
+
+} // namespace vespalib::<unnamed>
+
+//-----------------------------------------------------------------------------
+
+WakeupPipe::WakeupPipe()
+ : _pipe()
+{
+ int res = pipe2(_pipe, O_NONBLOCK);
+ assert(res == 0);
+}
+
+WakeupPipe::~WakeupPipe()
+{
+ close(_pipe[0]);
+ close(_pipe[1]);
+}
+
+void
+WakeupPipe::write_token()
+{
+ char token = 'T';
+ write(_pipe[1], &token, 1);
+}
+
+void
+WakeupPipe::read_tokens()
+{
+ char token_trash[128];
+ read(_pipe[0], token_trash, sizeof(token_trash));
+}
+
+//-----------------------------------------------------------------------------
+
+Epoll::Epoll()
+ : _epoll_fd(epoll_create1(0))
+{
+ assert(_epoll_fd != -1);
+}
+
+Epoll::~Epoll()
+{
+ close(_epoll_fd);
+}
+
+void
+Epoll::add(int fd, void *ctx, bool read, bool write)
+{
+ epoll_event evt;
+ evt.events = maybe(EPOLLIN, read) | maybe(EPOLLOUT, write);
+ evt.data.ptr = ctx;
+ check(epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &evt));
+}
+
+void
+Epoll::update(int fd, void *ctx, bool read, bool write)
+{
+ epoll_event evt;
+ evt.events = maybe(EPOLLIN, read) | maybe(EPOLLOUT, write);
+ evt.data.ptr = ctx;
+ check(epoll_ctl(_epoll_fd, EPOLL_CTL_MOD, fd, &evt));
+}
+
+void
+Epoll::remove(int fd)
+{
+ epoll_event evt;
+ memset(&evt, 0, sizeof(evt));
+ check(epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, &evt));
+}
+
+size_t
+Epoll::wait(epoll_event *events, size_t max_events, int timeout_ms)
+{
+ int res = epoll_wait(_epoll_fd, events, max_events, timeout_ms);
+ return std::max(res, 0);
+}
+
+//-----------------------------------------------------------------------------
+
+} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/net/selector.h b/vespalib/src/vespa/vespalib/net/selector.h
new file mode 100644
index 00000000000..11a4be31a8e
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/net/selector.h
@@ -0,0 +1,113 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vector>
+#include <sys/epoll.h>
+
+namespace vespalib {
+
+//-----------------------------------------------------------------------------
+
+/**
+ * A wakeup pipe is a non-blocking pipe that is used to wake up a
+ * blocking call to epoll_wait. The pipe readability is part of the
+ * selection set and a wakeup is triggered by writing to the
+ * pipe. When a wakeup is detected, pending tokens will be read and
+ * discarded to avoid spurious wakeups in the future.
+ **/
+class WakeupPipe {
+private:
+ int _pipe[2];
+public:
+ WakeupPipe();
+ ~WakeupPipe();
+ int get_read_fd() const { return _pipe[0]; }
+ void write_token();
+ void read_tokens();
+};
+
+//-----------------------------------------------------------------------------
+
+/**
+ * The Epoll class is a thin wrapper around the epoll related system
+ * calls.
+ **/
+class Epoll
+{
+private:
+ int _epoll_fd;
+public:
+ Epoll();
+ ~Epoll();
+ void add(int fd, void *ctx, bool read, bool write);
+ void update(int fd, void *ctx, bool read, bool write);
+ void remove(int fd);
+ size_t wait(epoll_event *events, size_t max_events, int timeout_ms);
+};
+
+//-----------------------------------------------------------------------------
+
+/**
+ * Simple class used to hold events extracted from a call to epoll_wait.
+ **/
+class EpollEvents
+{
+private:
+ std::vector<epoll_event> _epoll_events;
+ size_t _num_events;
+public:
+ EpollEvents(size_t max_events) : _epoll_events(max_events), _num_events(0) {}
+ void extract(Epoll &epoll, int timeout_ms) {
+ _num_events = epoll.wait(&_epoll_events[0], _epoll_events.size(), timeout_ms);
+ }
+ const epoll_event *begin() const { return &_epoll_events[0]; }
+ const epoll_event *end() const { return &_epoll_events[_num_events]; }
+ size_t size() const { return _num_events; }
+};
+
+//-----------------------------------------------------------------------------
+
+template <typename Handler>
+class Selector
+{
+private:
+ Handler &_handler;
+ Epoll _epoll;
+ WakeupPipe _wakeup_pipe;
+ EpollEvents _events;
+public:
+ using Context = typename Handler::context_type;
+ Selector(Handler &handler, size_t max_events)
+ : _handler(handler), _epoll(), _wakeup_pipe(), _events(max_events)
+ {
+ _epoll.add(_wakeup_pipe.get_read_fd(), nullptr, true, false);
+ }
+ ~Selector() {
+ _epoll.remove(_wakeup_pipe.get_read_fd());
+ }
+ void add(Context &ctx) { _epoll.add(_handler.get_fd(ctx), &ctx, true, true); }
+ void enable_write(Context &ctx) { _epoll.update(_handler.get_fd(ctx), &ctx, true, true); }
+ void disable_write(Context &ctx) { _epoll.update(_handler.get_fd(ctx), &ctx, true, false); }
+ void remove(Context &ctx) { _epoll.remove(_handler.get_fd(ctx)); }
+ void wakeup() { _wakeup_pipe.write_token(); }
+ void poll(int timeout_ms) { _events.extract(_epoll, timeout_ms); }
+ size_t num_events() const { return _events.size(); }
+ void dispatch() {
+ for (const auto &evt: _events) {
+ if (evt.data.ptr == nullptr) {
+ _wakeup_pipe.read_tokens();
+ _handler.handle_wakeup();
+ } else {
+ Context &ctx = *((Context *)(evt.data.ptr));
+ bool read = ((evt.events & (EPOLLIN | EPOLLERR | EPOLLHUP)) != 0);
+ bool write = ((evt.events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) != 0);
+ _handler.handle_event(ctx, read, write);
+ }
+ }
+ }
+};
+
+//-----------------------------------------------------------------------------
+
+} // namespace vespalib