diff options
author | Arne H Juul <arnej27959@users.noreply.github.com> | 2017-04-04 18:07:22 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-04-04 18:07:22 +0200 |
commit | e9c418d113cee3075d07279f3589fa6a144e593b (patch) | |
tree | d63c6877ce1693cf077930bcbbbd7bd2c275e68b /vespalib/src | |
parent | 33c1b83ebe7dd703d755a49060cfc913e02a692a (diff) | |
parent | bff34eff1e03e2a80bde065e957846de140941a9 (diff) |
Merge pull request #2149 from yahoo/havardpe/epoll-selector-in-vespalib
epoll selector with test
Diffstat (limited to 'vespalib/src')
-rw-r--r-- | vespalib/src/tests/net/selector/CMakeLists.txt | 8 | ||||
-rw-r--r-- | vespalib/src/tests/net/selector/selector_test.cpp | 171 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/CMakeLists.txt | 1 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/selector.cpp | 101 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/selector.h | 113 |
5 files changed, 394 insertions, 0 deletions
diff --git a/vespalib/src/tests/net/selector/CMakeLists.txt b/vespalib/src/tests/net/selector/CMakeLists.txt new file mode 100644 index 00000000000..85e7e818920 --- /dev/null +++ b/vespalib/src/tests/net/selector/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_selector_test_app TEST + SOURCES + selector_test.cpp + DEPENDS + vespalib +) +vespa_add_test(NAME vespalib_selector_test_app COMMAND vespalib_selector_test_app) diff --git a/vespalib/src/tests/net/selector/selector_test.cpp b/vespalib/src/tests/net/selector/selector_test.cpp new file mode 100644 index 00000000000..82494f5423f --- /dev/null +++ b/vespalib/src/tests/net/selector/selector_test.cpp @@ -0,0 +1,171 @@ +// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/vespalib/net/socket_address.h> +#include <vespa/vespalib/net/selector.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <thread> +#include <functional> +#include <chrono> + +using namespace vespalib; + +struct SocketPair { + SocketHandle a; + SocketHandle b; + SocketPair(int a_fd, int b_fd) : a(a_fd), b(b_fd) {} + SocketPair(SocketPair &&) = default; + SocketPair &operator=(SocketPair &&) = default; + static SocketPair create() { + int sockets[2]; + ASSERT_EQUAL(0, socketpair(AF_UNIX, SOCK_STREAM | O_NONBLOCK, 0, sockets)); + return SocketPair(sockets[0], sockets[1]); + } + ~SocketPair() {} +}; + +struct Context { + int fd; + bool can_read; + bool can_write; + Context(int fd_in) : fd(fd_in), can_read(false), can_write(false) {} + void reset() { + can_read = false; + can_write = false; + } +}; + +struct Handler { + bool wakeup; + using context_type = Context; + int get_fd(Context &ctx) const { return ctx.fd; } + void handle_wakeup() { wakeup = true; } + void handle_event(Context &ctx, bool read, bool write) { + ctx.can_read = read; + ctx.can_write = write; + } + void reset() { + wakeup = false; + } +}; + +struct Fixture { + Handler handler; + Selector<Handler> selector; + std::vector<SocketPair> sockets; + std::vector<Context> contexts; + Fixture(size_t size, bool write_enabled) : handler(), selector(handler, 1024), sockets(), contexts() { + for (size_t i = 0; i < size; ++i) { + sockets.push_back(SocketPair::create()); + contexts.push_back(Context(sockets.back().a.get())); + } + for (auto &ctx: contexts) { + selector.add(ctx, write_enabled); + } + } + Fixture &reset() { + handler.reset(); + for (auto &ctx: contexts) { + ctx.reset(); + } + return *this; + } + Fixture &poll(int timeout_ms = 250000) { + selector.poll(timeout_ms); + selector.dispatch(); + return *this; + } + void verify(bool expect_wakeup, std::vector<std::pair<bool,bool> > expect_events) { + EXPECT_EQUAL(expect_wakeup, handler.wakeup); + ASSERT_EQUAL(expect_events.size(), contexts.size()); + for (size_t i = 0; i < expect_events.size(); ++i) { + EXPECT_EQUAL(expect_events[i].first, contexts[i].can_read); + EXPECT_EQUAL(expect_events[i].second, contexts[i].can_write); + } + } +}; + +constexpr std::pair<bool,bool> none = std::make_pair(false, false); +constexpr std::pair<bool,bool> in = std::make_pair(true, false); +constexpr std::pair<bool,bool> out = std::make_pair(false, true); +constexpr std::pair<bool,bool> both = std::make_pair(true, true); + +TEST_F("require that basic events trigger correctly", Fixture(1, true)) { + TEST_DO(f1.reset().poll().verify(false, {out})); + EXPECT_EQUAL(write(f1.sockets[0].b.get(), "test", 4), 4); + TEST_DO(f1.reset().poll().verify(false, {both})); + f1.selector.disable_write(f1.contexts[0]); + TEST_DO(f1.reset().poll().verify(false, {in})); + f1.selector.enable_write(f1.contexts[0]); + TEST_DO(f1.reset().poll().verify(false, {both})); + f1.selector.wakeup(); + TEST_DO(f1.reset().poll().verify(true, {both})); + TEST_DO(f1.reset().poll().verify(false, {both})); +} + +TEST_F("require that multiple sources can be selected on", Fixture(5, false)) { + char buf[128]; + TEST_DO(f1.reset().poll(10).verify(false, {none, none, none, none, none})); + EXPECT_EQUAL(write(f1.sockets[1].b.get(), "test", 4), 4); + EXPECT_EQUAL(write(f1.sockets[3].b.get(), "test", 4), 4); + TEST_DO(f1.reset().poll().verify(false, {none, in, none, in, none})); + EXPECT_EQUAL(read(f1.sockets[1].a.get(), buf, sizeof(buf)), 4); + EXPECT_EQUAL(read(f1.sockets[3].a.get(), buf, 2), 2); + TEST_DO(f1.reset().poll().verify(false, {none, none, none, in, none})); + EXPECT_EQUAL(read(f1.sockets[3].a.get(), buf, sizeof(buf)), 2); + TEST_DO(f1.reset().poll(10).verify(false, {none, none, none, none, none})); +} + +TEST_F("require that removed sources no longer produce events", Fixture(2, true)) { + TEST_DO(f1.reset().poll().verify(false, {out, out})); + EXPECT_EQUAL(write(f1.sockets[0].b.get(), "test", 4), 4); + EXPECT_EQUAL(write(f1.sockets[1].b.get(), "test", 4), 4); + TEST_DO(f1.reset().poll().verify(false, {both, both})); + f1.selector.remove(f1.contexts[0]); + TEST_DO(f1.reset().poll().verify(false, {none, both})); +} + +TEST_F("require that filling the output buffer disables write events", Fixture(1, true)) { + EXPECT_EQUAL(write(f1.sockets[0].b.get(), "test", 4), 4); + TEST_DO(f1.reset().poll().verify(false, {both})); + size_t buffer_size = 0; + while (write(f1.sockets[0].a.get(), "x", 1) == 1) { + ++buffer_size; + } + EXPECT_EQUAL(errno, EWOULDBLOCK); + fprintf(stderr, "buffer size: %zu\n", buffer_size); + TEST_DO(f1.reset().poll().verify(false, {in})); +} + +TEST_MT_FF("require that selector can be woken while waiting for events", 2, Fixture(0, false), TimeBomb(60)) { + if (thread_id == 0) { + TEST_DO(f1.reset().poll().verify(true, {})); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + f1.selector.wakeup(); + } +} + +TEST_MT_FF("require that selection criteria can be changed while waiting for events", 2, Fixture(1, false), TimeBomb(60)) { + if (thread_id == 0) { + TEST_DO(f1.reset().poll().verify(false, {out})); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + f1.selector.enable_write(f1.contexts[0]); + } +} + +TEST_MT_FF("require that selection sources can be added while waiting for events", 2, Fixture(0, false), TimeBomb(60)) { + if (thread_id == 0) { + TEST_DO(f1.reset().poll().verify(false, {})); + TEST_BARRIER(); + } else { + SocketPair pair = SocketPair::create(); + Context ctx(pair.a.get()); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + f1.selector.add(ctx, true); + TEST_BARRIER(); + EXPECT_TRUE(ctx.can_write); + } +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/net/CMakeLists.txt b/vespalib/src/vespa/vespalib/net/CMakeLists.txt index 05eb0879152..40b8b656db7 100644 --- a/vespalib/src/vespa/vespalib/net/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/net/CMakeLists.txt @@ -1,6 +1,7 @@ # Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(vespalib_vespalib_net OBJECT SOURCES + selector.cpp server_socket.cpp socket.cpp socket_address.cpp diff --git a/vespalib/src/vespa/vespalib/net/selector.cpp b/vespalib/src/vespa/vespalib/net/selector.cpp new file mode 100644 index 00000000000..01027e5b762 --- /dev/null +++ b/vespalib/src/vespa/vespalib/net/selector.cpp @@ -0,0 +1,101 @@ +// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include "selector.h" + +namespace vespalib { + +namespace { + +//----------------------------------------------------------------------------- + +uint32_t maybe(uint32_t value, bool yes) { return yes ? value : 0; } + +void check(int res) { + if (res == -1) { + if (errno == ENOMEM) { + abort(); + } + } +} + +} // namespace vespalib::<unnamed> + +//----------------------------------------------------------------------------- + +WakeupPipe::WakeupPipe() + : _pipe() +{ + int res = pipe2(_pipe, O_NONBLOCK); + assert(res == 0); +} + +WakeupPipe::~WakeupPipe() +{ + close(_pipe[0]); + close(_pipe[1]); +} + +void +WakeupPipe::write_token() +{ + char token = 'T'; + write(_pipe[1], &token, 1); +} + +void +WakeupPipe::read_tokens() +{ + char token_trash[128]; + read(_pipe[0], token_trash, sizeof(token_trash)); +} + +//----------------------------------------------------------------------------- + +Epoll::Epoll() + : _epoll_fd(epoll_create1(0)) +{ + assert(_epoll_fd != -1); +} + +Epoll::~Epoll() +{ + close(_epoll_fd); +} + +void +Epoll::add(int fd, void *ctx, bool read, bool write) +{ + epoll_event evt; + evt.events = maybe(EPOLLIN, read) | maybe(EPOLLOUT, write); + evt.data.ptr = ctx; + check(epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &evt)); +} + +void +Epoll::update(int fd, void *ctx, bool read, bool write) +{ + epoll_event evt; + evt.events = maybe(EPOLLIN, read) | maybe(EPOLLOUT, write); + evt.data.ptr = ctx; + check(epoll_ctl(_epoll_fd, EPOLL_CTL_MOD, fd, &evt)); +} + +void +Epoll::remove(int fd) +{ + epoll_event evt; + memset(&evt, 0, sizeof(evt)); + check(epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, &evt)); +} + +size_t +Epoll::wait(epoll_event *events, size_t max_events, int timeout_ms) +{ + int res = epoll_wait(_epoll_fd, events, max_events, timeout_ms); + return std::max(res, 0); +} + +//----------------------------------------------------------------------------- + +} // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/net/selector.h b/vespalib/src/vespa/vespalib/net/selector.h new file mode 100644 index 00000000000..0ae993e97d6 --- /dev/null +++ b/vespalib/src/vespa/vespalib/net/selector.h @@ -0,0 +1,113 @@ +// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vector> +#include <sys/epoll.h> + +namespace vespalib { + +//----------------------------------------------------------------------------- + +/** + * A wakeup pipe is a non-blocking pipe that is used to wake up a + * blocking call to epoll_wait. The pipe readability is part of the + * selection set and a wakeup is triggered by writing to the + * pipe. When a wakeup is detected, pending tokens will be read and + * discarded to avoid spurious wakeups in the future. + **/ +class WakeupPipe { +private: + int _pipe[2]; +public: + WakeupPipe(); + ~WakeupPipe(); + int get_read_fd() const { return _pipe[0]; } + void write_token(); + void read_tokens(); +}; + +//----------------------------------------------------------------------------- + +/** + * The Epoll class is a thin wrapper around the epoll related system + * calls. + **/ +class Epoll +{ +private: + int _epoll_fd; +public: + Epoll(); + ~Epoll(); + void add(int fd, void *ctx, bool read, bool write); + void update(int fd, void *ctx, bool read, bool write); + void remove(int fd); + size_t wait(epoll_event *events, size_t max_events, int timeout_ms); +}; + +//----------------------------------------------------------------------------- + +/** + * Simple class used to hold events extracted from a call to epoll_wait. + **/ +class EpollEvents +{ +private: + std::vector<epoll_event> _epoll_events; + size_t _num_events; +public: + EpollEvents(size_t max_events) : _epoll_events(max_events), _num_events(0) {} + void extract(Epoll &epoll, int timeout_ms) { + _num_events = epoll.wait(&_epoll_events[0], _epoll_events.size(), timeout_ms); + } + const epoll_event *begin() const { return &_epoll_events[0]; } + const epoll_event *end() const { return &_epoll_events[_num_events]; } + size_t size() const { return _num_events; } +}; + +//----------------------------------------------------------------------------- + +template <typename Handler> +class Selector +{ +private: + Handler &_handler; + Epoll _epoll; + WakeupPipe _wakeup_pipe; + EpollEvents _events; +public: + using Context = typename Handler::context_type; + Selector(Handler &handler, size_t max_events) + : _handler(handler), _epoll(), _wakeup_pipe(), _events(max_events) + { + _epoll.add(_wakeup_pipe.get_read_fd(), nullptr, true, false); + } + ~Selector() { + _epoll.remove(_wakeup_pipe.get_read_fd()); + } + void add(Context &ctx, bool write_enabled) { _epoll.add(_handler.get_fd(ctx), &ctx, true, write_enabled); } + void enable_write(Context &ctx) { _epoll.update(_handler.get_fd(ctx), &ctx, true, true); } + void disable_write(Context &ctx) { _epoll.update(_handler.get_fd(ctx), &ctx, true, false); } + void remove(Context &ctx) { _epoll.remove(_handler.get_fd(ctx)); } + void wakeup() { _wakeup_pipe.write_token(); } + void poll(int timeout_ms) { _events.extract(_epoll, timeout_ms); } + size_t num_events() const { return _events.size(); } + void dispatch() { + for (const auto &evt: _events) { + if (evt.data.ptr == nullptr) { + _wakeup_pipe.read_tokens(); + _handler.handle_wakeup(); + } else { + Context &ctx = *((Context *)(evt.data.ptr)); + bool read = ((evt.events & (EPOLLIN | EPOLLERR | EPOLLHUP)) != 0); + bool write = ((evt.events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) != 0); + _handler.handle_event(ctx, read, write); + } + } + } +}; + +//----------------------------------------------------------------------------- + +} // namespace vespalib |