summaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorHaavard <havardpe@yahoo-inc.com>2017-04-26 09:49:40 +0000
committerHaavard <havardpe@yahoo-inc.com>2017-04-26 13:20:20 +0000
commit05e22e2759ef6230ccf541e8d92396bd857a2ed2 (patch)
treef095afc7a5a7cf6e6589380b0f8dc37c78c9c2af /vespalib
parent34f2553a62bda3c21bdd74cdf0e085772e9493de (diff)
make socket selection more flexible
freely specify read/write selection handler no longer expected to map context to file descriptor selector no longer bound to handler selector templated on context directly also added listen failure warning for server sockets
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/tests/net/selector/selector_test.cpp124
-rw-r--r--vespalib/src/tests/net/socket/socket_test.cpp34
-rw-r--r--vespalib/src/vespa/vespalib/net/selector.h22
-rw-r--r--vespalib/src/vespa/vespalib/net/server_socket.cpp3
-rw-r--r--vespalib/src/vespa/vespalib/net/server_socket.h1
5 files changed, 105 insertions, 79 deletions
diff --git a/vespalib/src/tests/net/selector/selector_test.cpp b/vespalib/src/tests/net/selector/selector_test.cpp
index 82494f5423f..f21e29b586d 100644
--- a/vespalib/src/tests/net/selector/selector_test.cpp
+++ b/vespalib/src/tests/net/selector/selector_test.cpp
@@ -34,54 +34,65 @@ struct Context {
}
};
-struct Handler {
- bool wakeup;
- using context_type = Context;
- int get_fd(Context &ctx) const { return ctx.fd; }
- void handle_wakeup() { wakeup = true; }
- void handle_event(Context &ctx, bool read, bool write) {
- ctx.can_read = read;
- ctx.can_write = write;
- }
- void reset() {
- wakeup = false;
- }
-};
-
struct Fixture {
- Handler handler;
- Selector<Handler> selector;
+ bool wakeup;
+ Selector<Context> selector;
std::vector<SocketPair> sockets;
std::vector<Context> contexts;
- Fixture(size_t size, bool write_enabled) : handler(), selector(handler, 1024), sockets(), contexts() {
+ Fixture(size_t size, bool read_enabled, bool write_enabled) : wakeup(false), selector(), sockets(), contexts() {
for (size_t i = 0; i < size; ++i) {
sockets.push_back(SocketPair::create());
contexts.push_back(Context(sockets.back().a.get()));
}
for (auto &ctx: contexts) {
- selector.add(ctx, write_enabled);
+ selector.add(ctx.fd, ctx, read_enabled, write_enabled);
}
}
+ void update(size_t idx, bool read, bool write) {
+ Context &ctx = contexts[idx];
+ selector.update(ctx.fd, ctx, read, write);
+ }
+ bool write(size_t idx, const char *str) {
+ size_t len = strlen(str);
+ ssize_t res = ::write(sockets[idx].b.get(), str, len);
+ return (res == ssize_t(len));
+ }
+ bool write_self(size_t idx, const char *str) {
+ size_t len = strlen(str);
+ ssize_t res = ::write(sockets[idx].a.get(), str, len);
+ return (res == ssize_t(len));
+ }
+ bool read(size_t idx, size_t len) {
+ char buf[128];
+ ssize_t res = ::read(sockets[idx].a.get(), buf, len);
+ return (res == ssize_t(len));
+ }
Fixture &reset() {
- handler.reset();
+ wakeup = false;
for (auto &ctx: contexts) {
ctx.reset();
}
return *this;
}
- Fixture &poll(int timeout_ms = 250000) {
+ Fixture &poll(int timeout_ms = 60000) {
selector.poll(timeout_ms);
- selector.dispatch();
+ selector.dispatch(*this);
return *this;
}
void verify(bool expect_wakeup, std::vector<std::pair<bool,bool> > expect_events) {
- EXPECT_EQUAL(expect_wakeup, handler.wakeup);
+ EXPECT_EQUAL(expect_wakeup, wakeup);
ASSERT_EQUAL(expect_events.size(), contexts.size());
for (size_t i = 0; i < expect_events.size(); ++i) {
EXPECT_EQUAL(expect_events[i].first, contexts[i].can_read);
EXPECT_EQUAL(expect_events[i].second, contexts[i].can_write);
}
}
+ // selector callbacks
+ void handle_wakeup() { wakeup = true; }
+ void handle_event(Context &ctx, bool read, bool write) {
+ ctx.can_read = read;
+ ctx.can_write = write;
+ }
};
constexpr std::pair<bool,bool> none = std::make_pair(false, false);
@@ -89,54 +100,73 @@ constexpr std::pair<bool,bool> in = std::make_pair(true, false);
constexpr std::pair<bool,bool> out = std::make_pair(false, true);
constexpr std::pair<bool,bool> both = std::make_pair(true, true);
-TEST_F("require that basic events trigger correctly", Fixture(1, true)) {
+TEST_F("require that basic events trigger correctly", Fixture(1, true, true)) {
TEST_DO(f1.reset().poll().verify(false, {out}));
- EXPECT_EQUAL(write(f1.sockets[0].b.get(), "test", 4), 4);
+ EXPECT_TRUE(f1.write(0, "test"));
TEST_DO(f1.reset().poll().verify(false, {both}));
- f1.selector.disable_write(f1.contexts[0]);
+ f1.update(0, true, false);
TEST_DO(f1.reset().poll().verify(false, {in}));
- f1.selector.enable_write(f1.contexts[0]);
- TEST_DO(f1.reset().poll().verify(false, {both}));
+ f1.update(0, false, true);
+ TEST_DO(f1.reset().poll().verify(false, {out}));
+ f1.update(0, false, false);
+ TEST_DO(f1.reset().poll(10).verify(false, {none}));
+ f1.update(0, true, true);
f1.selector.wakeup();
TEST_DO(f1.reset().poll().verify(true, {both}));
TEST_DO(f1.reset().poll().verify(false, {both}));
}
-TEST_F("require that multiple sources can be selected on", Fixture(5, false)) {
- char buf[128];
+TEST_FFF("require that sources can be added with some events disabled",
+ Fixture(1, true, false), Fixture(1, false, true), Fixture(1, false, false))
+{
+ EXPECT_TRUE(f1.write(0, "test"));
+ EXPECT_TRUE(f2.write(0, "test"));
+ EXPECT_TRUE(f3.write(0, "test"));
+ TEST_DO(f1.reset().poll().verify(false, {in}));
+ TEST_DO(f2.reset().poll().verify(false, {out}));
+ TEST_DO(f3.reset().poll(10).verify(false, {none}));
+ f1.update(0, true, true);
+ f2.update(0, true, true);
+ f3.update(0, true, true);
+ TEST_DO(f1.reset().poll().verify(false, {both}));
+ TEST_DO(f2.reset().poll().verify(false, {both}));
+ TEST_DO(f3.reset().poll().verify(false, {both}));
+}
+
+TEST_F("require that multiple sources can be selected on", Fixture(5, true, false)) {
TEST_DO(f1.reset().poll(10).verify(false, {none, none, none, none, none}));
- EXPECT_EQUAL(write(f1.sockets[1].b.get(), "test", 4), 4);
- EXPECT_EQUAL(write(f1.sockets[3].b.get(), "test", 4), 4);
+ EXPECT_TRUE(f1.write(1, "test"));
+ EXPECT_TRUE(f1.write(3, "test"));
TEST_DO(f1.reset().poll().verify(false, {none, in, none, in, none}));
- EXPECT_EQUAL(read(f1.sockets[1].a.get(), buf, sizeof(buf)), 4);
- EXPECT_EQUAL(read(f1.sockets[3].a.get(), buf, 2), 2);
+ EXPECT_TRUE(f1.read(1, strlen("test")));
+ EXPECT_TRUE(f1.read(3, strlen("te")));
TEST_DO(f1.reset().poll().verify(false, {none, none, none, in, none}));
- EXPECT_EQUAL(read(f1.sockets[3].a.get(), buf, sizeof(buf)), 2);
+ EXPECT_TRUE(f1.read(3, strlen("st")));
TEST_DO(f1.reset().poll(10).verify(false, {none, none, none, none, none}));
}
-TEST_F("require that removed sources no longer produce events", Fixture(2, true)) {
+TEST_F("require that removed sources no longer produce events", Fixture(2, true, true)) {
TEST_DO(f1.reset().poll().verify(false, {out, out}));
- EXPECT_EQUAL(write(f1.sockets[0].b.get(), "test", 4), 4);
- EXPECT_EQUAL(write(f1.sockets[1].b.get(), "test", 4), 4);
+ EXPECT_TRUE(f1.write(0, "test"));
+ EXPECT_TRUE(f1.write(1, "test"));
TEST_DO(f1.reset().poll().verify(false, {both, both}));
- f1.selector.remove(f1.contexts[0]);
+ f1.selector.remove(f1.contexts[0].fd);
TEST_DO(f1.reset().poll().verify(false, {none, both}));
}
-TEST_F("require that filling the output buffer disables write events", Fixture(1, true)) {
- EXPECT_EQUAL(write(f1.sockets[0].b.get(), "test", 4), 4);
+TEST_F("require that filling the output buffer disables write events", Fixture(1, true, true)) {
+ EXPECT_TRUE(f1.write(0, "test"));
TEST_DO(f1.reset().poll().verify(false, {both}));
size_t buffer_size = 0;
- while (write(f1.sockets[0].a.get(), "x", 1) == 1) {
+ while (f1.write_self(0, "x")) {
++buffer_size;
}
- EXPECT_EQUAL(errno, EWOULDBLOCK);
+ EXPECT_TRUE((errno == EWOULDBLOCK) || (errno == EAGAIN));
fprintf(stderr, "buffer size: %zu\n", buffer_size);
TEST_DO(f1.reset().poll().verify(false, {in}));
}
-TEST_MT_FF("require that selector can be woken while waiting for events", 2, Fixture(0, false), TimeBomb(60)) {
+TEST_MT_FF("require that selector can be woken while waiting for events", 2, Fixture(0, true, false), TimeBomb(60)) {
if (thread_id == 0) {
TEST_DO(f1.reset().poll().verify(true, {}));
} else {
@@ -145,16 +175,16 @@ TEST_MT_FF("require that selector can be woken while waiting for events", 2, Fix
}
}
-TEST_MT_FF("require that selection criteria can be changed while waiting for events", 2, Fixture(1, false), TimeBomb(60)) {
+TEST_MT_FF("require that selection criteria can be changed while waiting for events", 2, Fixture(1, true, false), TimeBomb(60)) {
if (thread_id == 0) {
TEST_DO(f1.reset().poll().verify(false, {out}));
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
- f1.selector.enable_write(f1.contexts[0]);
+ f1.update(0, true, true);
}
}
-TEST_MT_FF("require that selection sources can be added while waiting for events", 2, Fixture(0, false), TimeBomb(60)) {
+TEST_MT_FF("require that selection sources can be added while waiting for events", 2, Fixture(0, true, false), TimeBomb(60)) {
if (thread_id == 0) {
TEST_DO(f1.reset().poll().verify(false, {}));
TEST_BARRIER();
@@ -162,7 +192,7 @@ TEST_MT_FF("require that selection sources can be added while waiting for events
SocketPair pair = SocketPair::create();
Context ctx(pair.a.get());
std::this_thread::sleep_for(std::chrono::milliseconds(20));
- f1.selector.add(ctx, true);
+ f1.selector.add(ctx.fd, ctx, true, true);
TEST_BARRIER();
EXPECT_TRUE(ctx.can_write);
}
diff --git a/vespalib/src/tests/net/socket/socket_test.cpp b/vespalib/src/tests/net/socket/socket_test.cpp
index 67b4afb042a..55ef70d2d61 100644
--- a/vespalib/src/tests/net/socket/socket_test.cpp
+++ b/vespalib/src/tests/net/socket/socket_test.cpp
@@ -229,7 +229,7 @@ TEST("require that socket file is removed by server socket when destructed") {
ServerSocket server("ipc/file:my_socket");
EXPECT_TRUE(server.valid());
EXPECT_TRUE(is_socket("my_socket"));
- server = ServerSocket("invalid");
+ server = ServerSocket();
EXPECT_TRUE(!is_socket("my_socket"));
}
@@ -239,7 +239,7 @@ TEST("require that socket file is only removed on destruction if it is a socket"
EXPECT_TRUE(server.valid());
EXPECT_TRUE(is_socket("my_socket"));
replace_file("my_socket", "hello\n");
- server = ServerSocket("invalid");
+ server = ServerSocket();
EXPECT_TRUE(is_file("my_socket"));
remove_file("my_socket");
}
@@ -248,7 +248,7 @@ TEST("require that a server socket will fail to listen to a path that is already
replace_file("my_socket", "hello\n");
ServerSocket server("ipc/file:my_socket");
EXPECT_TRUE(!server.valid());
- server = ServerSocket("invalid");
+ server = ServerSocket();
EXPECT_TRUE(is_file("my_socket"));
remove_file("my_socket");
}
@@ -260,7 +260,7 @@ TEST("require that a server socket will fail to listen to a path that is already
EXPECT_TRUE(server1.valid());
EXPECT_TRUE(!server2.valid());
EXPECT_TRUE(is_socket("my_socket"));
- server1 = ServerSocket("invalid");
+ server1 = ServerSocket();
EXPECT_TRUE(!is_socket("my_socket"));
}
@@ -273,7 +273,7 @@ TEST("require that a server socket will remove an old socket file if it cannot b
EXPECT_TRUE(is_socket("my_socket"));
ServerSocket server("ipc/file:my_socket");
EXPECT_TRUE(server.valid());
- server = ServerSocket("invalid");
+ server = ServerSocket();
EXPECT_TRUE(!is_socket("my_socket"));
}
@@ -289,7 +289,7 @@ TEST("require that abstract socket names are freed when the server socket is des
vespalib::string spec = make_string("ipc/name:my_socket-%d", int(getpid()));
ServerSocket server1(spec);
EXPECT_TRUE(server1.valid());
- server1 = ServerSocket("invalid");
+ server1 = ServerSocket();
ServerSocket server2(spec);
EXPECT_TRUE(server2.valid());
}
@@ -385,33 +385,27 @@ SocketHandle connect_async(const SocketAddress &addr) {
SocketHandle handle;
bool connect_done = false;
int error = 0;
- };
- struct ConnectHandler {
- using context_type = ConnectContext;
- int get_fd(ConnectContext &ctx) const { return ctx.handle.get(); }
void handle_wakeup() {}
- void handle_event(ConnectContext &ctx, bool read, bool write) {
- (void) read;
- if (write) {
- ctx.connect_done = true;
- ctx.error = ctx.handle.get_so_error();
+ void handle_event(ConnectContext &ctx, bool, bool write) {
+ if ((&ctx == this) && write) {
+ connect_done = true;
+ error = ctx.handle.get_so_error();
}
}
};
- ConnectHandler handler;
- Selector<ConnectHandler> selector(handler, 4096);
+ Selector<ConnectContext> selector;
ConnectContext ctx;
ctx.handle = addr.connect_async();
EXPECT_TRUE(ctx.handle.valid());
test::SocketOptionsVerifier verifier(ctx.handle.get());
TEST_DO(verifier.verify_blocking(false));
if (ctx.handle.valid()) {
- selector.add(ctx, true);
+ selector.add(ctx.handle.get(), ctx, true, true);
while (!ctx.connect_done) {
selector.poll(1000);
- selector.dispatch();
+ selector.dispatch(ctx);
}
- selector.remove(ctx);
+ selector.remove(ctx.handle.get());
}
EXPECT_EQUAL(ctx.error, 0);
return std::move(ctx.handle);
diff --git a/vespalib/src/vespa/vespalib/net/selector.h b/vespalib/src/vespa/vespalib/net/selector.h
index 0ae993e97d6..b772a5fd9d5 100644
--- a/vespalib/src/vespa/vespalib/net/selector.h
+++ b/vespalib/src/vespa/vespalib/net/selector.h
@@ -68,41 +68,39 @@ public:
//-----------------------------------------------------------------------------
-template <typename Handler>
+template <typename Context>
class Selector
{
private:
- Handler &_handler;
Epoll _epoll;
WakeupPipe _wakeup_pipe;
EpollEvents _events;
public:
- using Context = typename Handler::context_type;
- Selector(Handler &handler, size_t max_events)
- : _handler(handler), _epoll(), _wakeup_pipe(), _events(max_events)
+ Selector()
+ : _epoll(), _wakeup_pipe(), _events(4096)
{
_epoll.add(_wakeup_pipe.get_read_fd(), nullptr, true, false);
}
~Selector() {
_epoll.remove(_wakeup_pipe.get_read_fd());
}
- void add(Context &ctx, bool write_enabled) { _epoll.add(_handler.get_fd(ctx), &ctx, true, write_enabled); }
- void enable_write(Context &ctx) { _epoll.update(_handler.get_fd(ctx), &ctx, true, true); }
- void disable_write(Context &ctx) { _epoll.update(_handler.get_fd(ctx), &ctx, true, false); }
- void remove(Context &ctx) { _epoll.remove(_handler.get_fd(ctx)); }
+ void add(int fd, Context &ctx, bool read, bool write) { _epoll.add(fd, &ctx, read, write); }
+ void update(int fd, Context &ctx, bool read, bool write) { _epoll.update(fd, &ctx, read, write); }
+ void remove(int fd) { _epoll.remove(fd); }
void wakeup() { _wakeup_pipe.write_token(); }
void poll(int timeout_ms) { _events.extract(_epoll, timeout_ms); }
size_t num_events() const { return _events.size(); }
- void dispatch() {
+ template <typename Handler>
+ void dispatch(Handler &handler) {
for (const auto &evt: _events) {
if (evt.data.ptr == nullptr) {
_wakeup_pipe.read_tokens();
- _handler.handle_wakeup();
+ handler.handle_wakeup();
} else {
Context &ctx = *((Context *)(evt.data.ptr));
bool read = ((evt.events & (EPOLLIN | EPOLLERR | EPOLLHUP)) != 0);
bool write = ((evt.events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) != 0);
- _handler.handle_event(ctx, read, write);
+ handler.handle_event(ctx, read, write);
}
}
}
diff --git a/vespalib/src/vespa/vespalib/net/server_socket.cpp b/vespalib/src/vespa/vespalib/net/server_socket.cpp
index 0f99e6e9840..9359f5824c0 100644
--- a/vespalib/src/vespa/vespalib/net/server_socket.cpp
+++ b/vespalib/src/vespa/vespalib/net/server_socket.cpp
@@ -35,6 +35,9 @@ ServerSocket::ServerSocket(const SocketSpec &spec)
_handle = spec.server_address().listen();
}
}
+ if (!_handle.valid()) {
+ LOG(warning, "listen failed: '%s'", spec.spec().c_str());
+ }
}
ServerSocket::ServerSocket(const vespalib::string &spec)
diff --git a/vespalib/src/vespa/vespalib/net/server_socket.h b/vespalib/src/vespa/vespalib/net/server_socket.h
index b61549277ac..3997aca4fbd 100644
--- a/vespalib/src/vespa/vespalib/net/server_socket.h
+++ b/vespalib/src/vespa/vespalib/net/server_socket.h
@@ -19,6 +19,7 @@ private:
static ServerSocket listen(const SocketSpec &spec);
void cleanup();
public:
+ ServerSocket() : _handle(), _path() {}
explicit ServerSocket(const SocketSpec &spec);
explicit ServerSocket(const vespalib::string &spec);
explicit ServerSocket(int port);