summaryrefslogtreecommitdiffstats
path: root/fnet
diff options
context:
space:
mode:
authorHaavard <havardpe@yahoo-inc.com>2017-04-26 11:34:42 +0000
committerHaavard <havardpe@yahoo-inc.com>2017-04-26 14:57:04 +0000
commit678c6f5720bfdd55ab288572c336c5f3d67d3aae (patch)
tree0881f68de193fe0c185ef7f229a9bd6e4f1b4fe1 /fnet
parent05e22e2759ef6230ccf541e8d92396bd857a2ed2 (diff)
use vespalib sockets in fnet
fixed bug: - handle execute command before checking IOC delete flag
Diffstat (limited to 'fnet')
-rw-r--r--fnet/src/tests/info/info.cpp4
-rw-r--r--fnet/src/vespa/fnet/connection.cpp81
-rw-r--r--fnet/src/vespa/fnet/connection.h12
-rw-r--r--fnet/src/vespa/fnet/connector.cpp75
-rw-r--r--fnet/src/vespa/fnet/connector.h26
-rw-r--r--fnet/src/vespa/fnet/iocomponent.cpp37
-rw-r--r--fnet/src/vespa/fnet/iocomponent.h26
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp319
-rw-r--r--fnet/src/vespa/fnet/transport_thread.h21
9 files changed, 232 insertions, 369 deletions
diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp
index 16d9d548ebf..383a5d018b9 100644
--- a/fnet/src/tests/info/info.cpp
+++ b/fnet/src/tests/info/info.cpp
@@ -70,10 +70,10 @@ TEST("info") {
TEST("size of important objects")
{
- EXPECT_EQUAL(184u, sizeof(FNET_IOComponent));
+ EXPECT_EQUAL(192u, sizeof(FNET_IOComponent));
EXPECT_EQUAL(32u, sizeof(FNET_Channel));
EXPECT_EQUAL(40u, sizeof(FNET_PacketQueue_NoLock));
- EXPECT_EQUAL(472u, sizeof(FNET_Connection));
+ EXPECT_EQUAL(480u, sizeof(FNET_Connection));
EXPECT_EQUAL(96u, sizeof(FastOS_Cond));
EXPECT_EQUAL(56u, sizeof(FNET_DataBuffer));
EXPECT_EQUAL(24u, sizeof(FastOS_Time));
diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp
index cc2e846741b..a3208503bb9 100644
--- a/fnet/src/vespa/fnet/connection.cpp
+++ b/fnet/src/vespa/fnet/connection.cpp
@@ -8,7 +8,6 @@
#include "iserveradapter.h"
#include "config.h"
#include "transport_thread.h"
-#include <vespa/fastos/socket.h>
#include <vespa/log/log.h>
LOG_SETUP(".fnet");
@@ -213,11 +212,10 @@ FNET_Connection::Read()
uint32_t readPackets = 0; // total packets read
int readCnt = 0; // read count
bool broken = false; // is this conn broken ?
- int error; // socket error code
ssize_t res; // single read result
_input.EnsureFree(FNET_READ_SIZE);
- res = _socket->Read(_input.GetFree(), _input.GetFreeLen());
+ res = _socket.read(_input.GetFree(), _input.GetFreeLen());
readCnt++;
while (res > 0) {
@@ -248,7 +246,7 @@ FNET_Connection::Read()
goto done_read;
_input.EnsureFree(FNET_READ_SIZE);
- res = _socket->Read(_input.GetFree(), _input.GetFreeLen());
+ res = _socket.read(_input.GetFree(), _input.GetFreeLen());
readCnt++;
}
@@ -271,13 +269,9 @@ done_read:
if (res == 0) {
broken = true; // handle EOF
} else { // res < 0
- error = FastOS_Socket::GetLastError();
- broken = (error != FastOS_Socket::ERR_WOULDBLOCK &&
- error != FastOS_Socket::ERR_AGAIN);
-
- if (broken && error != FastOS_Socket::ERR_CONNRESET) {
-
- LOG(debug, "Connection(%s): read error: %d", GetSpec(), error);
+ broken = ((errno != EWOULDBLOCK) && (errno != EAGAIN));
+ if (broken && (errno != ECONNRESET)) {
+ LOG(debug, "Connection(%s): read error: %d", GetSpec(), errno);
}
}
}
@@ -293,7 +287,6 @@ FNET_Connection::Write(bool direct)
uint32_t writtenPackets = 0; // total packets written
int writeCnt = 0; // write count
bool broken = false; // is this conn broken ?
- int error = 0; // no error (yet)
ssize_t res; // single write result
FNET_Packet *packet;
@@ -322,7 +315,7 @@ FNET_Connection::Write(bool direct)
// write data
- res = _socket->Write(_output.GetData(), _output.GetDataLen());
+ res = _socket.write(_output.GetData(), _output.GetDataLen());
writeCnt++;
if (res > 0) {
_output.DataToDead((uint32_t)res);
@@ -342,12 +335,9 @@ FNET_Connection::Write(bool direct)
}
if (res < 0) {
- error = FastOS_Socket::GetLastError();
- broken = (error != FastOS_Socket::ERR_WOULDBLOCK &&
- error != FastOS_Socket::ERR_AGAIN);
-
- if (broken) {
- LOG(debug, "Connection(%s): write error: %d", GetSpec(), error);
+ broken = ((errno != EWOULDBLOCK) && (errno != EAGAIN));
+ if (broken && (errno != ECONNRESET)) {
+ LOG(debug, "Connection(%s): write error: %d", GetSpec(), errno);
}
}
@@ -397,13 +387,13 @@ FNET_Connection::Write(bool direct)
FNET_Connection::FNET_Connection(FNET_TransportThread *owner,
FNET_IPacketStreamer *streamer,
FNET_IServerAdapter *serverAdapter,
- FastOS_SocketInterface *mySocket,
+ vespalib::SocketHandle socket,
const char *spec)
- : FNET_IOComponent(owner, mySocket, spec, /* time-out = */ true),
+ : FNET_IOComponent(owner, socket.get(), spec, /* time-out = */ true),
_streamer(streamer),
_serverAdapter(serverAdapter),
_adminChannel(nullptr),
- _socket(mySocket),
+ _socket(std::move(socket)),
_context(),
_state(FNET_CONNECTED), // <-- NB
_flags(),
@@ -420,7 +410,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner,
_callbackTarget(nullptr),
_cleanup(nullptr)
{
- assert(_socket != nullptr);
+ assert(_socket.valid());
LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(),
GetStateString(FNET_CONNECTING), GetStateString(FNET_CONNECTED));
}
@@ -432,13 +422,13 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner,
FNET_IPacketHandler *adminHandler,
FNET_Context adminContext,
FNET_Context context,
- FastOS_SocketInterface *mySocket,
+ vespalib::SocketHandle socket,
const char *spec)
- : FNET_IOComponent(owner, mySocket, spec, /* time-out = */ true),
+ : FNET_IOComponent(owner, socket.get(), spec, /* time-out = */ true),
_streamer(streamer),
_serverAdapter(serverAdapter),
_adminChannel(nullptr),
- _socket(mySocket),
+ _socket(std::move(socket)),
_context(context),
_state(FNET_CONNECTING),
_flags(),
@@ -455,7 +445,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner,
_callbackTarget(nullptr),
_cleanup(nullptr)
{
- assert(_socket != nullptr);
+ assert(_socket.valid());
if (adminHandler != nullptr) {
FNET_Channel::UP admin(new FNET_Channel(FNET_NOID, this, adminHandler, adminContext));
_adminChannel = admin.get();
@@ -471,38 +461,21 @@ FNET_Connection::~FNET_Connection()
delete _adminChannel;
}
assert(_cleanup == nullptr);
- assert(_socket->GetSocketEvent() == nullptr);
assert(!_flags._writeLock);
- delete _socket;
}
bool
FNET_Connection::Init()
{
- bool rc = _socket->SetSoBlocking(false)
- && _socket->TuneTransport();
-
- if (rc) {
- if (GetConfig()->_tcpNoDelay)
- _socket->SetNoDelay(true);
- EnableReadEvent(true);
- if (IsClient()) {
- EnableWriteEvent(true);
- if (!_socket->Connect()) {
- int error = FastOS_Socket::GetLastError();
- if (error != FastOS_Socket::ERR_INPROGRESS &&
- error != FastOS_Socket::ERR_WOULDBLOCK)
- {
- rc = false;
- LOG(debug, "Connection(%s): connect error: %d", GetSpec(), error);
- }
- }
- }
+ // set up relevant events
+ EnableReadEvent(true);
+ if (IsClient()) {
+ EnableWriteEvent(true);
}
// init server admin channel
- if (rc && CanAcceptChannels() && _adminChannel == nullptr) {
+ if (CanAcceptChannels() && _adminChannel == nullptr) {
FNET_Channel::UP ach(new FNET_Channel(FNET_NOID, this));
if (_serverAdapter->InitAdminChannel(ach.get())) {
AddRef_NoLock();
@@ -511,7 +484,7 @@ FNET_Connection::Init()
}
// handle close by admin channel init
- return (rc && _state <= FNET_CONNECTED);
+ return (_state <= FNET_CONNECTED);
}
@@ -680,10 +653,10 @@ FNET_Connection::CleanupHook()
void
FNET_Connection::Close()
{
- SetSocketEvent(nullptr);
+ detach_selector();
SetState(FNET_CLOSED);
- _socket->Shutdown();
- _socket->Close();
+ _ioc_socket_fd = -1;
+ _socket.reset();
}
@@ -715,7 +688,7 @@ FNET_Connection::HandleWriteEvent()
switch(_state) {
case FNET_CONNECTING:
- error = _socket->GetSoError();
+ error = _socket.get_so_error();
if (error == 0) { // connect ok
Lock();
_state = FNET_CONNECTED; // SetState(FNET_CONNECTED)
diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h
index b84797624bf..3e37bcd583b 100644
--- a/fnet/src/vespa/fnet/connection.h
+++ b/fnet/src/vespa/fnet/connection.h
@@ -7,10 +7,10 @@
#include "context.h"
#include "channellookup.h"
#include "packetqueue.h"
+#include <vespa/vespalib/net/socket_handle.h>
class FNET_IPacketStreamer;
class FNET_IServerAdapter;
-class FastOS_SocketInterface;
class FNET_IPacketHandler;
/**
@@ -80,7 +80,7 @@ private:
FNET_IPacketStreamer *_streamer; // custom packet streamer
FNET_IServerAdapter *_serverAdapter; // only on server side
FNET_Channel *_adminChannel; // only on client side
- FastOS_SocketInterface *_socket; // socket for this conn
+ vespalib::SocketHandle _socket; // socket for this conn
FNET_Context _context; // connection context
State _state; // connection state
Flags _flags; // Packed flags.
@@ -213,13 +213,13 @@ public:
* @param owner the TransportThread object serving this connection
* @param streamer custom packet streamer
* @param serverAdapter object for custom channel creation
- * @param mySocket the underlying socket used for IO
+ * @param socket the underlying socket used for IO
* @param spec listen spec
**/
FNET_Connection(FNET_TransportThread *owner,
FNET_IPacketStreamer *streamer,
FNET_IServerAdapter *serverAdapter,
- FastOS_SocketInterface *mySocket,
+ vespalib::SocketHandle socket,
const char *spec);
/**
@@ -231,7 +231,7 @@ public:
* @param adminHandler packet handler for admin channel
* @param adminContext context for admin channel
* @param context initial context for this connection
- * @param mySocket the underlying socket used for IO
+ * @param socket the underlying socket used for IO
* @param spec connect spec
**/
FNET_Connection(FNET_TransportThread *owner,
@@ -240,7 +240,7 @@ public:
FNET_IPacketHandler *adminHandler,
FNET_Context adminContext,
FNET_Context context,
- FastOS_SocketInterface *mySocket,
+ vespalib::SocketHandle socket,
const char *spec);
/**
diff --git a/fnet/src/vespa/fnet/connector.cpp b/fnet/src/vespa/fnet/connector.cpp
index a14a798d7c6..1372b5dc6ba 100644
--- a/fnet/src/vespa/fnet/connector.cpp
+++ b/fnet/src/vespa/fnet/connector.cpp
@@ -4,95 +4,56 @@
#include "transport_thread.h"
#include "transport.h"
#include "connection.h"
-#include <vespa/fastos/serversocket.h>
#include <vespa/log/log.h>
LOG_SETUP(".fnet");
+using vespalib::SocketHandle;
FNET_Connector::FNET_Connector(FNET_TransportThread *owner,
FNET_IPacketStreamer *streamer,
FNET_IServerAdapter *serverAdapter,
const char *spec,
- int port, int backlog,
- FastOS_SocketFactory *factory,
- const char *strictBindHostName)
- : FNET_IOComponent(owner, nullptr, spec, /* time-out = */ false),
+ vespalib::ServerSocket server_socket)
+ : FNET_IOComponent(owner, server_socket.get_fd(), spec, /* time-out = */ false),
_streamer(streamer),
_serverAdapter(serverAdapter),
- _serverSocket(nullptr),
- _strict(strictBindHostName != nullptr)
+ _server_socket(std::move(server_socket))
{
- _serverSocket = new FastOS_ServerSocket(port, backlog, factory,
- strictBindHostName);
- assert(_serverSocket != nullptr);
- _ioc_socket = _serverSocket; // set socket 'manually'
}
-FNET_Connector::~FNET_Connector()
-{
- assert(_serverSocket->GetSocketEvent() == nullptr);
- delete _serverSocket;
-}
-
uint32_t
FNET_Connector::GetPortNumber() const {
- return _serverSocket->GetLocalPort();
-}
-
-bool
-FNET_Connector::Init()
-{
- bool rc = true;
-
- // check if strict binding went OK.
- if (_strict) {
- rc = _serverSocket->GetValidAddressFlag();
- }
-
- // configure socket for non-blocked listening
- rc = (rc && (_serverSocket->SetSoBlocking(false))
- && (_serverSocket->Listen()));
-
- // print some debug output [XXX: remove this later]
- if(rc) {
- LOG(debug, "Connector(%s): TCP listen OK", GetSpec());
- EnableReadEvent(true);
- } else {
- LOG(warning, "Connector(%s): TCP listen FAILED", GetSpec());
- }
- return rc;
+ return _server_socket.address().port();
}
void
FNET_Connector::Close()
{
- SetSocketEvent(nullptr);
- _serverSocket->Close();
+ detach_selector();
+ _ioc_socket_fd = -1;
+ _server_socket = vespalib::ServerSocket();
}
bool
FNET_Connector::HandleReadEvent()
{
- FastOS_Socket *newSocket = nullptr;
- FNET_Connection *conn = nullptr;
-
- newSocket = _serverSocket->AcceptPlain();
- if (newSocket != nullptr) {
+ SocketHandle handle = _server_socket.accept();
+ if (handle.valid()) {
FNET_Transport &transport = Owner()->owner();
- FNET_TransportThread *thread = transport.select_thread(newSocket, sizeof(FastOS_Socket));
- conn = new FNET_Connection(thread, _streamer, _serverAdapter, newSocket, GetSpec());
- if (conn->Init()) {
- conn->Owner()->Add(conn, false);
- } else {
- LOG(debug, "Connector(%s): failed to init incoming connection",
- GetSpec());
- delete conn; // this is legal.
+ FNET_TransportThread *thread = transport.select_thread(&handle, sizeof(handle));
+ if (thread->tune(handle)) {
+ std::unique_ptr<FNET_Connection> conn = std::make_unique<FNET_Connection>(thread, _streamer, _serverAdapter, std::move(handle), GetSpec());
+ if (conn->Init()) {
+ thread->Add(conn.release(), /*needRef = */ false);
+ } else {
+ LOG(debug, "Connector(%s): failed to init incoming connection", GetSpec());
+ }
}
}
return true;
diff --git a/fnet/src/vespa/fnet/connector.h b/fnet/src/vespa/fnet/connector.h
index 7463397afa4..3ca2cd8cd5a 100644
--- a/fnet/src/vespa/fnet/connector.h
+++ b/fnet/src/vespa/fnet/connector.h
@@ -3,11 +3,10 @@
#pragma once
#include "iocomponent.h"
+#include <vespa/vespalib/net/server_socket.h>
-class FastOS_ServerSocket;
class FNET_IPacketStreamer;
class FNET_IServerAdapter;
-class FastOS_SocketFactory;
/**
* Class used to listen for incoming connections on a single TCP/IP
@@ -18,8 +17,7 @@ class FNET_Connector : public FNET_IOComponent
private:
FNET_IPacketStreamer *_streamer;
FNET_IServerAdapter *_serverAdapter;
- FastOS_ServerSocket *_serverSocket;
- bool _strict;
+ vespalib::ServerSocket _server_socket;
FNET_Connector(const FNET_Connector &);
FNET_Connector &operator=(const FNET_Connector &);
@@ -32,20 +30,13 @@ public:
* @param streamer custom packet streamer
* @param serverAdapter object for custom channel creation
* @param spec listen spec for this connector
- * @param port the port to listen on
- * @param backlog accept queue length
- * @param factory custom socket factory
- * @param strictBindHostName bind strict to given hostname
+ * @param server_socket the underlying server socket
**/
FNET_Connector(FNET_TransportThread *owner,
FNET_IPacketStreamer *streamer,
FNET_IServerAdapter *serverAdapter,
const char *spec,
- int port, int backlog = 500,
- FastOS_SocketFactory *factory = nullptr,
- const char *strictBindHostName = nullptr);
- ~FNET_Connector();
-
+ vespalib::ServerSocket server_socket);
/**
* Obtain the port number of the underlying server socket.
@@ -55,15 +46,6 @@ public:
uint32_t GetPortNumber() const;
/**
- * Try to create a listening server socket at the port number
- * specified in the constructor. The socket is set to
- * non-blocking.
- *
- * @return true on success, false on fail
- **/
- bool Init();
-
- /**
* Close this connector. This method must be called in the transport
* thread in order to avoid race conditions related to socket event
* registration, deregistration and triggering.
diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp
index f6154f04951..80122b3352f 100644
--- a/fnet/src/vespa/fnet/iocomponent.cpp
+++ b/fnet/src/vespa/fnet/iocomponent.cpp
@@ -2,18 +2,18 @@
#include "iocomponent.h"
#include "transport_thread.h"
-#include <vespa/fastos/socket.h>
FNET_IOComponent::FNET_IOComponent(FNET_TransportThread *owner,
- FastOS_SocketInterface *mysocket,
+ int socket_fd,
const char *spec,
bool shouldTimeOut)
: _ioc_next(nullptr),
_ioc_prev(nullptr),
_ioc_owner(owner),
_ioc_counters(_ioc_owner->GetStatCounters()),
- _ioc_socket(mysocket),
+ _ioc_socket_fd(socket_fd),
+ _ioc_selector(nullptr),
_ioc_spec(nullptr),
_flags(shouldTimeOut),
_ioc_timestamp(fastos::ClockSystem::now()),
@@ -30,6 +30,7 @@ FNET_IOComponent::FNET_IOComponent(FNET_TransportThread *owner,
FNET_IOComponent::~FNET_IOComponent()
{
free(_ioc_spec);
+ assert(_ioc_selector == nullptr);
}
FNET_Config *
@@ -98,25 +99,30 @@ FNET_IOComponent::SubRef_NoLock()
void
-FNET_IOComponent::SetSocketEvent(FastOS_SocketEvent *event)
+FNET_IOComponent::attach_selector(Selector &selector)
{
- bool rc = _ioc_socket->SetSocketEvent(event, this);
- assert(rc); // XXX: error handling
- (void) rc;
+ detach_selector();
+ _ioc_selector = &selector;
+ _ioc_selector->add(_ioc_socket_fd, *this, _flags._ioc_readEnabled, _flags._ioc_writeEnabled);
+}
+
- if (event != nullptr) {
- _ioc_socket->EnableReadEvent(_flags._ioc_readEnabled);
- _ioc_socket->EnableWriteEvent(_flags._ioc_writeEnabled);
+void
+FNET_IOComponent::detach_selector()
+{
+ if (_ioc_selector != nullptr) {
+ _ioc_selector->remove(_ioc_socket_fd);
}
+ _ioc_selector = nullptr;
}
-
void
FNET_IOComponent::EnableReadEvent(bool enabled)
{
_flags._ioc_readEnabled = enabled;
- if (_ioc_socket->GetSocketEvent() != nullptr)
- _ioc_socket->EnableReadEvent(enabled);
+ if (_ioc_selector != nullptr) {
+ _ioc_selector->update(_ioc_socket_fd, *this, _flags._ioc_readEnabled, _flags._ioc_writeEnabled);
+ }
}
@@ -124,8 +130,9 @@ void
FNET_IOComponent::EnableWriteEvent(bool enabled)
{
_flags._ioc_writeEnabled = enabled;
- if (_ioc_socket->GetSocketEvent() != nullptr)
- _ioc_socket->EnableWriteEvent(enabled);
+ if (_ioc_selector != nullptr) {
+ _ioc_selector->update(_ioc_socket_fd, *this, _flags._ioc_readEnabled, _flags._ioc_writeEnabled);
+ }
}
diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h
index cb719c3051d..7e17f5679de 100644
--- a/fnet/src/vespa/fnet/iocomponent.h
+++ b/fnet/src/vespa/fnet/iocomponent.h
@@ -5,12 +5,11 @@
#include "stats.h"
#include <vespa/fastos/cond.h>
#include <vespa/fastos/timestamp.h>
+#include <vespa/vespalib/net/selector.h>
class FNET_TransportThread;
class FNET_StatCounters;
-class FastOS_SocketInterface;
class FNET_Config;
-class FastOS_SocketEvent;
/**
* This is the common superclass of all components that may be part of
@@ -25,6 +24,8 @@ class FNET_IOComponent
FNET_IOComponent(const FNET_IOComponent &);
FNET_IOComponent &operator=(const FNET_IOComponent &);
+ using Selector = vespalib::Selector<FNET_IOComponent>;
+
struct Flags {
Flags(bool shouldTimeout) :
_ioc_readEnabled(false),
@@ -44,7 +45,8 @@ protected:
FNET_IOComponent *_ioc_prev; // prev in list
FNET_TransportThread *_ioc_owner; // owner(TransportThread) ref.
FNET_StatCounters *_ioc_counters; // stat counters
- FastOS_SocketInterface *_ioc_socket; // source of events.
+ int _ioc_socket_fd; // source of events.
+ Selector *_ioc_selector; // attached event selector
char *_ioc_spec; // connect/listen spec
Flags _flags; // Compressed representation of boolean flags;
fastos::TimeStamp _ioc_timestamp; // last I/O activity
@@ -65,11 +67,11 @@ public:
* subclasses.
*
* @param owner the TransportThread object owning this component
- * @param mysocket the socket used by this IOC
+ * @param socket_fd the socket handle used by this IOC
* @param spec listen/connect spec for this IOC
* @param shouldTimeOut should this IOC time out if idle ?
**/
- FNET_IOComponent(FNET_TransportThread *owner, FastOS_SocketInterface *mysocket,
+ FNET_IOComponent(FNET_TransportThread *owner, int socket_fd,
const char *spec, bool shouldTimeOut);
@@ -267,13 +269,19 @@ public:
/**
- * Assign a FastOS_SocketEvent to this component. Before deleting an
- * IOC, one must assign nullptr as the socket event.
+ * Attach an event selector to this component. Before deleting an
+ * IOC, one must first call detach_selector to detach the
+ * selector.
*
- * @param event the socket event to register with.
+ * @param selector event selector to be attached.
**/
- void SetSocketEvent(FastOS_SocketEvent *event);
+ void attach_selector(Selector &selector);
+ /**
+ * Detach from the attached event selector. This will disable
+ * future selector events.
+ **/
+ void detach_selector();
/**
* Enable or disable read events.
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index e1a6d31a03e..1eed64b7e85 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -8,11 +8,16 @@
#include "connection.h"
#include "transport.h"
#include <vespa/vespalib/util/sync.h>
-#include <vespa/fastos/socket.h>
+#include <vespa/vespalib/net/socket_spec.h>
+#include <vespa/vespalib/net/server_socket.h>
#include <vespa/log/log.h>
LOG_SETUP(".fnet");
+using vespalib::ServerSocket;
+using vespalib::SocketHandle;
+using vespalib::SocketSpec;
+
namespace {
struct Sync : public FNET_IExecutable
@@ -25,37 +30,6 @@ struct Sync : public FNET_IExecutable
} // namespace<unnamed>
-
-char *
-SplitString(char *input, const char *sep, int &argc, char **argv, int maxargs)
-{
- int i;
- int sepcnt = strlen(sep);
-
- for (argc = 0, argv[0] = input; *input != '\0'; input++) {
- if (*input == '[' && argc == 0 && argv[argc] == input) {
- argv[argc] = ++input; // Skip '['
- for (; *input != ']' && *input != '\0'; ++input);
- if (*input == ']')
- *input++ = '\0'; // Replace ']'
- if (*input == '\0')
- break;
- }
- for (i = 0; i < sepcnt; i++) {
- if (*input == sep[i]) {
- *input = '\0';
- if (*(argv[argc]) != '\0' && ++argc >= maxargs)
- return (input + 1); // INCOMPLETE
- argv[argc] = (input + 1);
- break; // inner for loop
- }
- }
- }
- if (*(argv[argc]) != '\0')
- argc++;
- return nullptr; // COMPLETE
-}
-
#ifndef IAM_DOXYGEN
void
FNET_TransportThread::StatsTask::PerformTask()
@@ -158,7 +132,7 @@ FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket,
_queue.QueuePacket_NoLock(cpacket, context);
Unlock();
if (wasEmpty) {
- _socketEvent.AsyncWakeUp();
+ _selector.wakeup();
}
return true;
}
@@ -243,8 +217,7 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in)
_componentsTail(nullptr),
_componentCnt(0),
_deleteList(nullptr),
- _socketEvent(),
- _events(nullptr),
+ _selector(),
_queue(),
_myQueue(),
_cond(),
@@ -255,7 +228,6 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in)
_deleted(false)
{
_now.SetNow();
- assert(_socketEvent.GetCreateSuccess());
trapsigpipe();
}
@@ -271,47 +243,29 @@ FNET_TransportThread::~FNET_TransportThread()
}
+bool
+FNET_TransportThread::tune(SocketHandle &handle) const
+{
+ handle.set_keepalive(true);
+ handle.set_linger(true, 0);
+ handle.set_nodelay(_config._tcpNoDelay);
+ return handle.set_blocking(false);
+}
+
+
FNET_Connector*
FNET_TransportThread::Listen(const char *spec, FNET_IPacketStreamer *streamer,
FNET_IServerAdapter *serverAdapter)
{
- int speclen = strlen(spec);
- char tmp[1024];
- int argc;
- char *argv[32];
-
- assert(speclen < 1024);
- memcpy(tmp, spec, speclen);
- tmp[speclen] = '\0';
- if (SplitString(tmp, "/", argc, argv, 32) != nullptr
- || argc != 2)
- return nullptr; // wrong number of parameters
-
- // handle different connection types (currently only TCP/IP support)
- if (strcasecmp(argv[0], "tcp") == 0) {
- if (SplitString(argv[1], ":", argc, argv, 32) != nullptr
- || argc < 1 || argc > 2)
- return nullptr; // wrong number of parameters
-
- int port = atoi(argv[argc - 1]); // last param is port
- if (port < 0)
- return nullptr;
- if (port == 0 && strcmp(argv[argc - 1], "0") != 0)
- return nullptr;
- FNET_Connector *connector;
- connector = new FNET_Connector(this, streamer, serverAdapter, spec, port,
- 500, nullptr, (argc == 2) ? argv[0] : nullptr);
- if (connector->Init()) {
- connector->AddRef_NoLock();
- Add(connector, /* needRef = */ false);
- return connector;
- } else {
- delete connector;
- return nullptr;
- }
- } else {
- return nullptr;
+ ServerSocket server_socket{SocketSpec(spec)};
+ if (server_socket.valid() && server_socket.set_blocking(false)) {
+ FNET_Connector *connector = new FNET_Connector(this, streamer, serverAdapter, spec, std::move(server_socket));
+ connector->EnableReadEvent(true);
+ connector->AddRef_NoLock();
+ Add(connector, /* needRef = */ false);
+ return connector;
}
+ return nullptr;
}
@@ -322,43 +276,18 @@ FNET_TransportThread::Connect(const char *spec, FNET_IPacketStreamer *streamer,
FNET_IServerAdapter *serverAdapter,
FNET_Context connContext)
{
- int speclen = strlen(spec);
- char tmp[1024];
- int argc;
- char *argv[32];
-
- assert(speclen < 1024);
- memcpy(tmp, spec, speclen);
- tmp[speclen] = '\0';
- if (SplitString(tmp, "/", argc, argv, 32) != nullptr
- || argc != 2)
- return nullptr; // wrong number of parameters
-
- // handle different connection types (currently only TCP/IP support)
- if (strcasecmp(argv[0], "tcp") == 0) {
- if (SplitString(argv[1], ":", argc, argv, 32) != nullptr
- || argc != 2)
- return nullptr; // wrong number of parameters
-
- int port = atoi(argv[1]);
- if (port <= 0)
- return nullptr;
- FastOS_Socket *mysocket = new FastOS_Socket();
- mysocket->SetAddress(port, argv[0]);
- FNET_Connection *conn = new FNET_Connection(this, streamer, serverAdapter,
- adminHandler, adminContext,
- connContext, mysocket, spec);
+ auto tweak = [this](SocketHandle &handle) { return tune(handle); };
+ SocketHandle handle = SocketSpec(spec).client_address().connect(tweak);
+ if (handle.valid()) {
+ std::unique_ptr<FNET_Connection> conn = std::make_unique<FNET_Connection>(this, streamer, serverAdapter,
+ adminHandler, adminContext, connContext, std::move(handle), spec);
if (conn->Init()) {
conn->AddRef_NoLock();
- Add(conn, /* needRef = */ false);
- return conn;
- } else {
- delete conn;
- return nullptr;
+ Add(conn.get(), /*needRef = */ false);
+ return conn.release();
}
- } else {
- return nullptr;
}
+ return nullptr;
}
@@ -457,11 +386,12 @@ FNET_TransportThread::ShutDown(bool waitFinished)
wasEmpty = _queue.IsEmpty_NoLock();
}
Unlock();
- if (wasEmpty)
- _socketEvent.AsyncWakeUp();
-
- if (waitFinished)
+ if (wasEmpty) {
+ _selector.wakeup();
+ }
+ if (waitFinished) {
WaitFinished();
+ }
}
@@ -499,10 +429,6 @@ FNET_TransportThread::InitEventLoop()
LOG(error, "Transport: InitEventLoop: object was deleted!");
return false;
}
-
- _events = new FastOS_IOEvent[EVT_MAX];
- assert(_events != nullptr);
-
_now.SetNow();
_startTime = _now;
_statTime = _now;
@@ -511,16 +437,87 @@ FNET_TransportThread::InitEventLoop()
}
+void
+FNET_TransportThread::handle_wakeup()
+{
+ Lock();
+ CountEvent(_queue.FlushPackets_NoLock(&_myQueue));
+ Unlock();
+
+ FNET_Context context;
+ FNET_Packet *packet = nullptr;
+ while ((packet = _myQueue.DequeuePacket_NoLock(&context)) != nullptr) {
+
+ if (packet->GetCommand() == FNET_ControlPacket::FNET_CMD_EXECUTE) {
+ context._value.EXECUTABLE->execute();
+ continue;
+ }
+
+ if (context._value.IOC->_flags._ioc_delete) {
+ context._value.IOC->SubRef();
+ continue;
+ }
+
+ switch (packet->GetCommand()) {
+ case FNET_ControlPacket::FNET_CMD_IOC_ADD:
+ AddComponent(context._value.IOC);
+ context._value.IOC->_flags._ioc_added = true;
+ context._value.IOC->attach_selector(_selector);
+ break;
+ case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_READ:
+ context._value.IOC->EnableReadEvent(true);
+ context._value.IOC->SubRef();
+ break;
+ case FNET_ControlPacket::FNET_CMD_IOC_DISABLE_READ:
+ context._value.IOC->EnableReadEvent(false);
+ context._value.IOC->SubRef();
+ break;
+ case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_WRITE:
+ context._value.IOC->EnableWriteEvent(true);
+ context._value.IOC->SubRef();
+ break;
+ case FNET_ControlPacket::FNET_CMD_IOC_DISABLE_WRITE:
+ context._value.IOC->EnableWriteEvent(false);
+ context._value.IOC->SubRef();
+ break;
+ case FNET_ControlPacket::FNET_CMD_IOC_CLOSE:
+ if (context._value.IOC->_flags._ioc_added) {
+ RemoveComponent(context._value.IOC);
+ context._value.IOC->SubRef();
+ }
+ context._value.IOC->Close();
+ AddDeleteComponent(context._value.IOC);
+ break;
+ }
+ }
+}
+
+
+void
+FNET_TransportThread::handle_event(FNET_IOComponent &ctx, bool read, bool write)
+{
+ if (!ctx._flags._ioc_delete) {
+ bool rc = true;
+ if (read) {
+ rc = rc && ctx.HandleReadEvent();
+ }
+ if (write) {
+ rc = rc && ctx.HandleWriteEvent();
+ }
+ if (!rc) { // IOC is broken, close it
+ RemoveComponent(&ctx);
+ ctx.Close();
+ AddDeleteComponent(&ctx);
+ }
+ }
+}
+
+
bool
FNET_TransportThread::EventLoopIteration()
{
- FNET_Packet *packet = nullptr;
- FNET_Context context;
FNET_IOComponent *component = nullptr;
- int evt_cnt = 0;
- FastOS_IOEvent *events = _events;
int msTimeout = FNET_Scheduler::SLOT_TICK;
- bool wakeUp = false;
#ifdef FNET_SANITY_CHECKS
FastOS_Time beforeGetEvents;
@@ -537,7 +534,7 @@ FNET_TransportThread::EventLoopIteration()
#endif
// obtain I/O events
- evt_cnt = _socketEvent.GetEvents(&wakeUp, msTimeout, events, EVT_MAX);
+ _selector.poll(msTimeout);
CountEventLoop();
// sample current time (performed once per event loop iteration)
@@ -551,83 +548,9 @@ FNET_TransportThread::EventLoopIteration()
extractTime, msTimeout);
#endif
- // report event error (if any)
- if (evt_cnt < 0) {
- std::string str = FastOS_Socket::getLastErrorString();
- LOG(spam, "Transport: event error: %s", str.c_str());
- } else {
- CountIOEvent(evt_cnt);
- }
-
- // handle internal transport layer events
- if (wakeUp) {
-
- Lock();
- CountEvent(_queue.FlushPackets_NoLock(&_myQueue));
- Unlock();
-
- while ((packet = _myQueue.DequeuePacket_NoLock(&context)) != nullptr) {
-
- if (context._value.IOC->_flags._ioc_delete) {
- context._value.IOC->SubRef();
- continue;
- }
-
- switch (packet->GetCommand()) {
- case FNET_ControlPacket::FNET_CMD_IOC_ADD:
- AddComponent(context._value.IOC);
- context._value.IOC->_flags._ioc_added = true;
- context._value.IOC->SetSocketEvent(&_socketEvent);
- break;
- case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_READ:
- context._value.IOC->EnableReadEvent(true);
- context._value.IOC->SubRef();
- break;
- case FNET_ControlPacket::FNET_CMD_IOC_DISABLE_READ:
- context._value.IOC->EnableReadEvent(false);
- context._value.IOC->SubRef();
- break;
- case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_WRITE:
- context._value.IOC->EnableWriteEvent(true);
- context._value.IOC->SubRef();
- break;
- case FNET_ControlPacket::FNET_CMD_IOC_DISABLE_WRITE:
- context._value.IOC->EnableWriteEvent(false);
- context._value.IOC->SubRef();
- break;
- case FNET_ControlPacket::FNET_CMD_IOC_CLOSE:
- if (context._value.IOC->_flags._ioc_added) {
- RemoveComponent(context._value.IOC);
- context._value.IOC->SubRef();
- }
- context._value.IOC->Close();
- AddDeleteComponent(context._value.IOC);
- break;
- case FNET_ControlPacket::FNET_CMD_EXECUTE:
- context._value.EXECUTABLE->execute();
- break;
- }
- }
- }
-
- // handle I/O events
- for (int i = 0; i < evt_cnt; i++) {
-
- component = (FNET_IOComponent *) events[i]._eventAttribute;
- if (component == nullptr || component->_flags._ioc_delete)
- continue;
-
- bool rc = true;
- if (events[i]._readOccurred)
- rc = rc && component->HandleReadEvent();
- if (events[i]._writeOccurred)
- rc = rc && component->HandleWriteEvent();
- if (!rc) { // IOC is broken, close it
- RemoveComponent(component);
- component->Close();
- AddDeleteComponent(component);
- }
- }
+ // handle wakeup and io-events
+ CountIOEvent(_selector.num_events());
+ _selector.dispatch(*this);
// handle IOC time-outs
if (_config._iocTimeOut > 0) {
@@ -666,6 +589,8 @@ FNET_TransportThread::EventLoopIteration()
Unlock();
// discard remaining events
+ FNET_Context context;
+ FNET_Packet *packet = nullptr;
while ((packet = _myQueue.DequeuePacket_NoLock(&context)) != nullptr) {
if (packet->GetCommand() == FNET_ControlPacket::FNET_CMD_EXECUTE) {
context._value.EXECUTABLE->execute();
@@ -691,8 +616,6 @@ FNET_TransportThread::EventLoopIteration()
_queue.IsEmpty_NoLock() &&
_myQueue.IsEmpty_NoLock());
- delete [] _events;
-
Lock();
_finished = true;
if (_waitFinished)
diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h
index a9e1f056d17..72402aa9e10 100644
--- a/fnet/src/vespa/fnet/transport_thread.h
+++ b/fnet/src/vespa/fnet/transport_thread.h
@@ -8,8 +8,9 @@
#include "packetqueue.h"
#include "stats.h"
#include <vespa/fastos/thread.h>
-#include <vespa/fastos/socketevent.h>
#include <vespa/fastos/time.h>
+#include <vespa/vespalib/net/socket_handle.h>
+#include <vespa/vespalib/net/selector.h>
class FNET_Transport;
class FNET_ControlPacket;
@@ -26,9 +27,7 @@ class FNET_TransportThread : public FastOS_Runnable
friend class FNET_IOComponent;
public:
- enum {
- EVT_MAX = 4096
- };
+ using Selector = vespalib::Selector<FNET_IOComponent>;
#ifndef IAM_DOXYGEN
class StatsTask : public FNET_Task
@@ -61,8 +60,7 @@ private:
FNET_IOComponent *_componentsTail; // I/O component list tail
uint32_t _componentCnt; // # of components
FNET_IOComponent *_deleteList; // IOC delete list
- FastOS_SocketEvent _socketEvent; // I/O event generator
- FastOS_IOEvent *_events; // I/O event array
+ Selector _selector; // I/O event generator
FNET_PacketQueue_NoLock _queue; // outer event queue
FNET_PacketQueue_NoLock _myQueue; // inner event queue
FastOS_Cond _cond; // used for synchronization
@@ -256,6 +254,11 @@ public:
**/
FNET_Transport &owner() const { return _owner; }
+ /**
+ * Tune the given socket handle to be used as an async transport
+ * connection.
+ **/
+ bool tune(vespalib::SocketHandle &handle) const;
/**
* Add a network listener in an abstract way. The given 'spec'
@@ -587,6 +590,12 @@ public:
bool InitEventLoop();
+ // selector call-back for selector wakeup
+ void handle_wakeup();
+
+ // selector call-back for io-events
+ void handle_event(FNET_IOComponent &ctx, bool read, bool write);
+
/**
* Perform a single transport thread event loop iteration. This
* method is called by the FRT_Transport::Run method. If you want to