diff options
author | Haavard <havardpe@yahoo-inc.com> | 2017-04-26 11:34:42 +0000 |
---|---|---|
committer | Haavard <havardpe@yahoo-inc.com> | 2017-04-26 14:57:04 +0000 |
commit | 678c6f5720bfdd55ab288572c336c5f3d67d3aae (patch) | |
tree | 0881f68de193fe0c185ef7f229a9bd6e4f1b4fe1 /fnet/src | |
parent | 05e22e2759ef6230ccf541e8d92396bd857a2ed2 (diff) |
use vespalib sockets in fnet
fixed bug:
- handle execute command before checking IOC delete flag
Diffstat (limited to 'fnet/src')
-rw-r--r-- | fnet/src/tests/info/info.cpp | 4 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connection.cpp | 81 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connection.h | 12 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connector.cpp | 75 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connector.h | 26 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/iocomponent.cpp | 37 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/iocomponent.h | 26 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.cpp | 319 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.h | 21 |
9 files changed, 232 insertions, 369 deletions
diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp index 16d9d548ebf..383a5d018b9 100644 --- a/fnet/src/tests/info/info.cpp +++ b/fnet/src/tests/info/info.cpp @@ -70,10 +70,10 @@ TEST("info") { TEST("size of important objects") { - EXPECT_EQUAL(184u, sizeof(FNET_IOComponent)); + EXPECT_EQUAL(192u, sizeof(FNET_IOComponent)); EXPECT_EQUAL(32u, sizeof(FNET_Channel)); EXPECT_EQUAL(40u, sizeof(FNET_PacketQueue_NoLock)); - EXPECT_EQUAL(472u, sizeof(FNET_Connection)); + EXPECT_EQUAL(480u, sizeof(FNET_Connection)); EXPECT_EQUAL(96u, sizeof(FastOS_Cond)); EXPECT_EQUAL(56u, sizeof(FNET_DataBuffer)); EXPECT_EQUAL(24u, sizeof(FastOS_Time)); diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp index cc2e846741b..a3208503bb9 100644 --- a/fnet/src/vespa/fnet/connection.cpp +++ b/fnet/src/vespa/fnet/connection.cpp @@ -8,7 +8,6 @@ #include "iserveradapter.h" #include "config.h" #include "transport_thread.h" -#include <vespa/fastos/socket.h> #include <vespa/log/log.h> LOG_SETUP(".fnet"); @@ -213,11 +212,10 @@ FNET_Connection::Read() uint32_t readPackets = 0; // total packets read int readCnt = 0; // read count bool broken = false; // is this conn broken ? - int error; // socket error code ssize_t res; // single read result _input.EnsureFree(FNET_READ_SIZE); - res = _socket->Read(_input.GetFree(), _input.GetFreeLen()); + res = _socket.read(_input.GetFree(), _input.GetFreeLen()); readCnt++; while (res > 0) { @@ -248,7 +246,7 @@ FNET_Connection::Read() goto done_read; _input.EnsureFree(FNET_READ_SIZE); - res = _socket->Read(_input.GetFree(), _input.GetFreeLen()); + res = _socket.read(_input.GetFree(), _input.GetFreeLen()); readCnt++; } @@ -271,13 +269,9 @@ done_read: if (res == 0) { broken = true; // handle EOF } else { // res < 0 - error = FastOS_Socket::GetLastError(); - broken = (error != FastOS_Socket::ERR_WOULDBLOCK && - error != FastOS_Socket::ERR_AGAIN); - - if (broken && error != FastOS_Socket::ERR_CONNRESET) { - - LOG(debug, "Connection(%s): read error: %d", GetSpec(), error); + broken = ((errno != EWOULDBLOCK) && (errno != EAGAIN)); + if (broken && (errno != ECONNRESET)) { + LOG(debug, "Connection(%s): read error: %d", GetSpec(), errno); } } } @@ -293,7 +287,6 @@ FNET_Connection::Write(bool direct) uint32_t writtenPackets = 0; // total packets written int writeCnt = 0; // write count bool broken = false; // is this conn broken ? - int error = 0; // no error (yet) ssize_t res; // single write result FNET_Packet *packet; @@ -322,7 +315,7 @@ FNET_Connection::Write(bool direct) // write data - res = _socket->Write(_output.GetData(), _output.GetDataLen()); + res = _socket.write(_output.GetData(), _output.GetDataLen()); writeCnt++; if (res > 0) { _output.DataToDead((uint32_t)res); @@ -342,12 +335,9 @@ FNET_Connection::Write(bool direct) } if (res < 0) { - error = FastOS_Socket::GetLastError(); - broken = (error != FastOS_Socket::ERR_WOULDBLOCK && - error != FastOS_Socket::ERR_AGAIN); - - if (broken) { - LOG(debug, "Connection(%s): write error: %d", GetSpec(), error); + broken = ((errno != EWOULDBLOCK) && (errno != EAGAIN)); + if (broken && (errno != ECONNRESET)) { + LOG(debug, "Connection(%s): write error: %d", GetSpec(), errno); } } @@ -397,13 +387,13 @@ FNET_Connection::Write(bool direct) FNET_Connection::FNET_Connection(FNET_TransportThread *owner, FNET_IPacketStreamer *streamer, FNET_IServerAdapter *serverAdapter, - FastOS_SocketInterface *mySocket, + vespalib::SocketHandle socket, const char *spec) - : FNET_IOComponent(owner, mySocket, spec, /* time-out = */ true), + : FNET_IOComponent(owner, socket.get(), spec, /* time-out = */ true), _streamer(streamer), _serverAdapter(serverAdapter), _adminChannel(nullptr), - _socket(mySocket), + _socket(std::move(socket)), _context(), _state(FNET_CONNECTED), // <-- NB _flags(), @@ -420,7 +410,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner, _callbackTarget(nullptr), _cleanup(nullptr) { - assert(_socket != nullptr); + assert(_socket.valid()); LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(), GetStateString(FNET_CONNECTING), GetStateString(FNET_CONNECTED)); } @@ -432,13 +422,13 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner, FNET_IPacketHandler *adminHandler, FNET_Context adminContext, FNET_Context context, - FastOS_SocketInterface *mySocket, + vespalib::SocketHandle socket, const char *spec) - : FNET_IOComponent(owner, mySocket, spec, /* time-out = */ true), + : FNET_IOComponent(owner, socket.get(), spec, /* time-out = */ true), _streamer(streamer), _serverAdapter(serverAdapter), _adminChannel(nullptr), - _socket(mySocket), + _socket(std::move(socket)), _context(context), _state(FNET_CONNECTING), _flags(), @@ -455,7 +445,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner, _callbackTarget(nullptr), _cleanup(nullptr) { - assert(_socket != nullptr); + assert(_socket.valid()); if (adminHandler != nullptr) { FNET_Channel::UP admin(new FNET_Channel(FNET_NOID, this, adminHandler, adminContext)); _adminChannel = admin.get(); @@ -471,38 +461,21 @@ FNET_Connection::~FNET_Connection() delete _adminChannel; } assert(_cleanup == nullptr); - assert(_socket->GetSocketEvent() == nullptr); assert(!_flags._writeLock); - delete _socket; } bool FNET_Connection::Init() { - bool rc = _socket->SetSoBlocking(false) - && _socket->TuneTransport(); - - if (rc) { - if (GetConfig()->_tcpNoDelay) - _socket->SetNoDelay(true); - EnableReadEvent(true); - if (IsClient()) { - EnableWriteEvent(true); - if (!_socket->Connect()) { - int error = FastOS_Socket::GetLastError(); - if (error != FastOS_Socket::ERR_INPROGRESS && - error != FastOS_Socket::ERR_WOULDBLOCK) - { - rc = false; - LOG(debug, "Connection(%s): connect error: %d", GetSpec(), error); - } - } - } + // set up relevant events + EnableReadEvent(true); + if (IsClient()) { + EnableWriteEvent(true); } // init server admin channel - if (rc && CanAcceptChannels() && _adminChannel == nullptr) { + if (CanAcceptChannels() && _adminChannel == nullptr) { FNET_Channel::UP ach(new FNET_Channel(FNET_NOID, this)); if (_serverAdapter->InitAdminChannel(ach.get())) { AddRef_NoLock(); @@ -511,7 +484,7 @@ FNET_Connection::Init() } // handle close by admin channel init - return (rc && _state <= FNET_CONNECTED); + return (_state <= FNET_CONNECTED); } @@ -680,10 +653,10 @@ FNET_Connection::CleanupHook() void FNET_Connection::Close() { - SetSocketEvent(nullptr); + detach_selector(); SetState(FNET_CLOSED); - _socket->Shutdown(); - _socket->Close(); + _ioc_socket_fd = -1; + _socket.reset(); } @@ -715,7 +688,7 @@ FNET_Connection::HandleWriteEvent() switch(_state) { case FNET_CONNECTING: - error = _socket->GetSoError(); + error = _socket.get_so_error(); if (error == 0) { // connect ok Lock(); _state = FNET_CONNECTED; // SetState(FNET_CONNECTED) diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h index b84797624bf..3e37bcd583b 100644 --- a/fnet/src/vespa/fnet/connection.h +++ b/fnet/src/vespa/fnet/connection.h @@ -7,10 +7,10 @@ #include "context.h" #include "channellookup.h" #include "packetqueue.h" +#include <vespa/vespalib/net/socket_handle.h> class FNET_IPacketStreamer; class FNET_IServerAdapter; -class FastOS_SocketInterface; class FNET_IPacketHandler; /** @@ -80,7 +80,7 @@ private: FNET_IPacketStreamer *_streamer; // custom packet streamer FNET_IServerAdapter *_serverAdapter; // only on server side FNET_Channel *_adminChannel; // only on client side - FastOS_SocketInterface *_socket; // socket for this conn + vespalib::SocketHandle _socket; // socket for this conn FNET_Context _context; // connection context State _state; // connection state Flags _flags; // Packed flags. @@ -213,13 +213,13 @@ public: * @param owner the TransportThread object serving this connection * @param streamer custom packet streamer * @param serverAdapter object for custom channel creation - * @param mySocket the underlying socket used for IO + * @param socket the underlying socket used for IO * @param spec listen spec **/ FNET_Connection(FNET_TransportThread *owner, FNET_IPacketStreamer *streamer, FNET_IServerAdapter *serverAdapter, - FastOS_SocketInterface *mySocket, + vespalib::SocketHandle socket, const char *spec); /** @@ -231,7 +231,7 @@ public: * @param adminHandler packet handler for admin channel * @param adminContext context for admin channel * @param context initial context for this connection - * @param mySocket the underlying socket used for IO + * @param socket the underlying socket used for IO * @param spec connect spec **/ FNET_Connection(FNET_TransportThread *owner, @@ -240,7 +240,7 @@ public: FNET_IPacketHandler *adminHandler, FNET_Context adminContext, FNET_Context context, - FastOS_SocketInterface *mySocket, + vespalib::SocketHandle socket, const char *spec); /** diff --git a/fnet/src/vespa/fnet/connector.cpp b/fnet/src/vespa/fnet/connector.cpp index a14a798d7c6..1372b5dc6ba 100644 --- a/fnet/src/vespa/fnet/connector.cpp +++ b/fnet/src/vespa/fnet/connector.cpp @@ -4,95 +4,56 @@ #include "transport_thread.h" #include "transport.h" #include "connection.h" -#include <vespa/fastos/serversocket.h> #include <vespa/log/log.h> LOG_SETUP(".fnet"); +using vespalib::SocketHandle; FNET_Connector::FNET_Connector(FNET_TransportThread *owner, FNET_IPacketStreamer *streamer, FNET_IServerAdapter *serverAdapter, const char *spec, - int port, int backlog, - FastOS_SocketFactory *factory, - const char *strictBindHostName) - : FNET_IOComponent(owner, nullptr, spec, /* time-out = */ false), + vespalib::ServerSocket server_socket) + : FNET_IOComponent(owner, server_socket.get_fd(), spec, /* time-out = */ false), _streamer(streamer), _serverAdapter(serverAdapter), - _serverSocket(nullptr), - _strict(strictBindHostName != nullptr) + _server_socket(std::move(server_socket)) { - _serverSocket = new FastOS_ServerSocket(port, backlog, factory, - strictBindHostName); - assert(_serverSocket != nullptr); - _ioc_socket = _serverSocket; // set socket 'manually' } -FNET_Connector::~FNET_Connector() -{ - assert(_serverSocket->GetSocketEvent() == nullptr); - delete _serverSocket; -} - uint32_t FNET_Connector::GetPortNumber() const { - return _serverSocket->GetLocalPort(); -} - -bool -FNET_Connector::Init() -{ - bool rc = true; - - // check if strict binding went OK. - if (_strict) { - rc = _serverSocket->GetValidAddressFlag(); - } - - // configure socket for non-blocked listening - rc = (rc && (_serverSocket->SetSoBlocking(false)) - && (_serverSocket->Listen())); - - // print some debug output [XXX: remove this later] - if(rc) { - LOG(debug, "Connector(%s): TCP listen OK", GetSpec()); - EnableReadEvent(true); - } else { - LOG(warning, "Connector(%s): TCP listen FAILED", GetSpec()); - } - return rc; + return _server_socket.address().port(); } void FNET_Connector::Close() { - SetSocketEvent(nullptr); - _serverSocket->Close(); + detach_selector(); + _ioc_socket_fd = -1; + _server_socket = vespalib::ServerSocket(); } bool FNET_Connector::HandleReadEvent() { - FastOS_Socket *newSocket = nullptr; - FNET_Connection *conn = nullptr; - - newSocket = _serverSocket->AcceptPlain(); - if (newSocket != nullptr) { + SocketHandle handle = _server_socket.accept(); + if (handle.valid()) { FNET_Transport &transport = Owner()->owner(); - FNET_TransportThread *thread = transport.select_thread(newSocket, sizeof(FastOS_Socket)); - conn = new FNET_Connection(thread, _streamer, _serverAdapter, newSocket, GetSpec()); - if (conn->Init()) { - conn->Owner()->Add(conn, false); - } else { - LOG(debug, "Connector(%s): failed to init incoming connection", - GetSpec()); - delete conn; // this is legal. + FNET_TransportThread *thread = transport.select_thread(&handle, sizeof(handle)); + if (thread->tune(handle)) { + std::unique_ptr<FNET_Connection> conn = std::make_unique<FNET_Connection>(thread, _streamer, _serverAdapter, std::move(handle), GetSpec()); + if (conn->Init()) { + thread->Add(conn.release(), /*needRef = */ false); + } else { + LOG(debug, "Connector(%s): failed to init incoming connection", GetSpec()); + } } } return true; diff --git a/fnet/src/vespa/fnet/connector.h b/fnet/src/vespa/fnet/connector.h index 7463397afa4..3ca2cd8cd5a 100644 --- a/fnet/src/vespa/fnet/connector.h +++ b/fnet/src/vespa/fnet/connector.h @@ -3,11 +3,10 @@ #pragma once #include "iocomponent.h" +#include <vespa/vespalib/net/server_socket.h> -class FastOS_ServerSocket; class FNET_IPacketStreamer; class FNET_IServerAdapter; -class FastOS_SocketFactory; /** * Class used to listen for incoming connections on a single TCP/IP @@ -18,8 +17,7 @@ class FNET_Connector : public FNET_IOComponent private: FNET_IPacketStreamer *_streamer; FNET_IServerAdapter *_serverAdapter; - FastOS_ServerSocket *_serverSocket; - bool _strict; + vespalib::ServerSocket _server_socket; FNET_Connector(const FNET_Connector &); FNET_Connector &operator=(const FNET_Connector &); @@ -32,20 +30,13 @@ public: * @param streamer custom packet streamer * @param serverAdapter object for custom channel creation * @param spec listen spec for this connector - * @param port the port to listen on - * @param backlog accept queue length - * @param factory custom socket factory - * @param strictBindHostName bind strict to given hostname + * @param server_socket the underlying server socket **/ FNET_Connector(FNET_TransportThread *owner, FNET_IPacketStreamer *streamer, FNET_IServerAdapter *serverAdapter, const char *spec, - int port, int backlog = 500, - FastOS_SocketFactory *factory = nullptr, - const char *strictBindHostName = nullptr); - ~FNET_Connector(); - + vespalib::ServerSocket server_socket); /** * Obtain the port number of the underlying server socket. @@ -55,15 +46,6 @@ public: uint32_t GetPortNumber() const; /** - * Try to create a listening server socket at the port number - * specified in the constructor. The socket is set to - * non-blocking. - * - * @return true on success, false on fail - **/ - bool Init(); - - /** * Close this connector. This method must be called in the transport * thread in order to avoid race conditions related to socket event * registration, deregistration and triggering. diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp index f6154f04951..80122b3352f 100644 --- a/fnet/src/vespa/fnet/iocomponent.cpp +++ b/fnet/src/vespa/fnet/iocomponent.cpp @@ -2,18 +2,18 @@ #include "iocomponent.h" #include "transport_thread.h" -#include <vespa/fastos/socket.h> FNET_IOComponent::FNET_IOComponent(FNET_TransportThread *owner, - FastOS_SocketInterface *mysocket, + int socket_fd, const char *spec, bool shouldTimeOut) : _ioc_next(nullptr), _ioc_prev(nullptr), _ioc_owner(owner), _ioc_counters(_ioc_owner->GetStatCounters()), - _ioc_socket(mysocket), + _ioc_socket_fd(socket_fd), + _ioc_selector(nullptr), _ioc_spec(nullptr), _flags(shouldTimeOut), _ioc_timestamp(fastos::ClockSystem::now()), @@ -30,6 +30,7 @@ FNET_IOComponent::FNET_IOComponent(FNET_TransportThread *owner, FNET_IOComponent::~FNET_IOComponent() { free(_ioc_spec); + assert(_ioc_selector == nullptr); } FNET_Config * @@ -98,25 +99,30 @@ FNET_IOComponent::SubRef_NoLock() void -FNET_IOComponent::SetSocketEvent(FastOS_SocketEvent *event) +FNET_IOComponent::attach_selector(Selector &selector) { - bool rc = _ioc_socket->SetSocketEvent(event, this); - assert(rc); // XXX: error handling - (void) rc; + detach_selector(); + _ioc_selector = &selector; + _ioc_selector->add(_ioc_socket_fd, *this, _flags._ioc_readEnabled, _flags._ioc_writeEnabled); +} + - if (event != nullptr) { - _ioc_socket->EnableReadEvent(_flags._ioc_readEnabled); - _ioc_socket->EnableWriteEvent(_flags._ioc_writeEnabled); +void +FNET_IOComponent::detach_selector() +{ + if (_ioc_selector != nullptr) { + _ioc_selector->remove(_ioc_socket_fd); } + _ioc_selector = nullptr; } - void FNET_IOComponent::EnableReadEvent(bool enabled) { _flags._ioc_readEnabled = enabled; - if (_ioc_socket->GetSocketEvent() != nullptr) - _ioc_socket->EnableReadEvent(enabled); + if (_ioc_selector != nullptr) { + _ioc_selector->update(_ioc_socket_fd, *this, _flags._ioc_readEnabled, _flags._ioc_writeEnabled); + } } @@ -124,8 +130,9 @@ void FNET_IOComponent::EnableWriteEvent(bool enabled) { _flags._ioc_writeEnabled = enabled; - if (_ioc_socket->GetSocketEvent() != nullptr) - _ioc_socket->EnableWriteEvent(enabled); + if (_ioc_selector != nullptr) { + _ioc_selector->update(_ioc_socket_fd, *this, _flags._ioc_readEnabled, _flags._ioc_writeEnabled); + } } diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h index cb719c3051d..7e17f5679de 100644 --- a/fnet/src/vespa/fnet/iocomponent.h +++ b/fnet/src/vespa/fnet/iocomponent.h @@ -5,12 +5,11 @@ #include "stats.h" #include <vespa/fastos/cond.h> #include <vespa/fastos/timestamp.h> +#include <vespa/vespalib/net/selector.h> class FNET_TransportThread; class FNET_StatCounters; -class FastOS_SocketInterface; class FNET_Config; -class FastOS_SocketEvent; /** * This is the common superclass of all components that may be part of @@ -25,6 +24,8 @@ class FNET_IOComponent FNET_IOComponent(const FNET_IOComponent &); FNET_IOComponent &operator=(const FNET_IOComponent &); + using Selector = vespalib::Selector<FNET_IOComponent>; + struct Flags { Flags(bool shouldTimeout) : _ioc_readEnabled(false), @@ -44,7 +45,8 @@ protected: FNET_IOComponent *_ioc_prev; // prev in list FNET_TransportThread *_ioc_owner; // owner(TransportThread) ref. FNET_StatCounters *_ioc_counters; // stat counters - FastOS_SocketInterface *_ioc_socket; // source of events. + int _ioc_socket_fd; // source of events. + Selector *_ioc_selector; // attached event selector char *_ioc_spec; // connect/listen spec Flags _flags; // Compressed representation of boolean flags; fastos::TimeStamp _ioc_timestamp; // last I/O activity @@ -65,11 +67,11 @@ public: * subclasses. * * @param owner the TransportThread object owning this component - * @param mysocket the socket used by this IOC + * @param socket_fd the socket handle used by this IOC * @param spec listen/connect spec for this IOC * @param shouldTimeOut should this IOC time out if idle ? **/ - FNET_IOComponent(FNET_TransportThread *owner, FastOS_SocketInterface *mysocket, + FNET_IOComponent(FNET_TransportThread *owner, int socket_fd, const char *spec, bool shouldTimeOut); @@ -267,13 +269,19 @@ public: /** - * Assign a FastOS_SocketEvent to this component. Before deleting an - * IOC, one must assign nullptr as the socket event. + * Attach an event selector to this component. Before deleting an + * IOC, one must first call detach_selector to detach the + * selector. * - * @param event the socket event to register with. + * @param selector event selector to be attached. **/ - void SetSocketEvent(FastOS_SocketEvent *event); + void attach_selector(Selector &selector); + /** + * Detach from the attached event selector. This will disable + * future selector events. + **/ + void detach_selector(); /** * Enable or disable read events. diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index e1a6d31a03e..1eed64b7e85 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -8,11 +8,16 @@ #include "connection.h" #include "transport.h" #include <vespa/vespalib/util/sync.h> -#include <vespa/fastos/socket.h> +#include <vespa/vespalib/net/socket_spec.h> +#include <vespa/vespalib/net/server_socket.h> #include <vespa/log/log.h> LOG_SETUP(".fnet"); +using vespalib::ServerSocket; +using vespalib::SocketHandle; +using vespalib::SocketSpec; + namespace { struct Sync : public FNET_IExecutable @@ -25,37 +30,6 @@ struct Sync : public FNET_IExecutable } // namespace<unnamed> - -char * -SplitString(char *input, const char *sep, int &argc, char **argv, int maxargs) -{ - int i; - int sepcnt = strlen(sep); - - for (argc = 0, argv[0] = input; *input != '\0'; input++) { - if (*input == '[' && argc == 0 && argv[argc] == input) { - argv[argc] = ++input; // Skip '[' - for (; *input != ']' && *input != '\0'; ++input); - if (*input == ']') - *input++ = '\0'; // Replace ']' - if (*input == '\0') - break; - } - for (i = 0; i < sepcnt; i++) { - if (*input == sep[i]) { - *input = '\0'; - if (*(argv[argc]) != '\0' && ++argc >= maxargs) - return (input + 1); // INCOMPLETE - argv[argc] = (input + 1); - break; // inner for loop - } - } - } - if (*(argv[argc]) != '\0') - argc++; - return nullptr; // COMPLETE -} - #ifndef IAM_DOXYGEN void FNET_TransportThread::StatsTask::PerformTask() @@ -158,7 +132,7 @@ FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket, _queue.QueuePacket_NoLock(cpacket, context); Unlock(); if (wasEmpty) { - _socketEvent.AsyncWakeUp(); + _selector.wakeup(); } return true; } @@ -243,8 +217,7 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in) _componentsTail(nullptr), _componentCnt(0), _deleteList(nullptr), - _socketEvent(), - _events(nullptr), + _selector(), _queue(), _myQueue(), _cond(), @@ -255,7 +228,6 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in) _deleted(false) { _now.SetNow(); - assert(_socketEvent.GetCreateSuccess()); trapsigpipe(); } @@ -271,47 +243,29 @@ FNET_TransportThread::~FNET_TransportThread() } +bool +FNET_TransportThread::tune(SocketHandle &handle) const +{ + handle.set_keepalive(true); + handle.set_linger(true, 0); + handle.set_nodelay(_config._tcpNoDelay); + return handle.set_blocking(false); +} + + FNET_Connector* FNET_TransportThread::Listen(const char *spec, FNET_IPacketStreamer *streamer, FNET_IServerAdapter *serverAdapter) { - int speclen = strlen(spec); - char tmp[1024]; - int argc; - char *argv[32]; - - assert(speclen < 1024); - memcpy(tmp, spec, speclen); - tmp[speclen] = '\0'; - if (SplitString(tmp, "/", argc, argv, 32) != nullptr - || argc != 2) - return nullptr; // wrong number of parameters - - // handle different connection types (currently only TCP/IP support) - if (strcasecmp(argv[0], "tcp") == 0) { - if (SplitString(argv[1], ":", argc, argv, 32) != nullptr - || argc < 1 || argc > 2) - return nullptr; // wrong number of parameters - - int port = atoi(argv[argc - 1]); // last param is port - if (port < 0) - return nullptr; - if (port == 0 && strcmp(argv[argc - 1], "0") != 0) - return nullptr; - FNET_Connector *connector; - connector = new FNET_Connector(this, streamer, serverAdapter, spec, port, - 500, nullptr, (argc == 2) ? argv[0] : nullptr); - if (connector->Init()) { - connector->AddRef_NoLock(); - Add(connector, /* needRef = */ false); - return connector; - } else { - delete connector; - return nullptr; - } - } else { - return nullptr; + ServerSocket server_socket{SocketSpec(spec)}; + if (server_socket.valid() && server_socket.set_blocking(false)) { + FNET_Connector *connector = new FNET_Connector(this, streamer, serverAdapter, spec, std::move(server_socket)); + connector->EnableReadEvent(true); + connector->AddRef_NoLock(); + Add(connector, /* needRef = */ false); + return connector; } + return nullptr; } @@ -322,43 +276,18 @@ FNET_TransportThread::Connect(const char *spec, FNET_IPacketStreamer *streamer, FNET_IServerAdapter *serverAdapter, FNET_Context connContext) { - int speclen = strlen(spec); - char tmp[1024]; - int argc; - char *argv[32]; - - assert(speclen < 1024); - memcpy(tmp, spec, speclen); - tmp[speclen] = '\0'; - if (SplitString(tmp, "/", argc, argv, 32) != nullptr - || argc != 2) - return nullptr; // wrong number of parameters - - // handle different connection types (currently only TCP/IP support) - if (strcasecmp(argv[0], "tcp") == 0) { - if (SplitString(argv[1], ":", argc, argv, 32) != nullptr - || argc != 2) - return nullptr; // wrong number of parameters - - int port = atoi(argv[1]); - if (port <= 0) - return nullptr; - FastOS_Socket *mysocket = new FastOS_Socket(); - mysocket->SetAddress(port, argv[0]); - FNET_Connection *conn = new FNET_Connection(this, streamer, serverAdapter, - adminHandler, adminContext, - connContext, mysocket, spec); + auto tweak = [this](SocketHandle &handle) { return tune(handle); }; + SocketHandle handle = SocketSpec(spec).client_address().connect(tweak); + if (handle.valid()) { + std::unique_ptr<FNET_Connection> conn = std::make_unique<FNET_Connection>(this, streamer, serverAdapter, + adminHandler, adminContext, connContext, std::move(handle), spec); if (conn->Init()) { conn->AddRef_NoLock(); - Add(conn, /* needRef = */ false); - return conn; - } else { - delete conn; - return nullptr; + Add(conn.get(), /*needRef = */ false); + return conn.release(); } - } else { - return nullptr; } + return nullptr; } @@ -457,11 +386,12 @@ FNET_TransportThread::ShutDown(bool waitFinished) wasEmpty = _queue.IsEmpty_NoLock(); } Unlock(); - if (wasEmpty) - _socketEvent.AsyncWakeUp(); - - if (waitFinished) + if (wasEmpty) { + _selector.wakeup(); + } + if (waitFinished) { WaitFinished(); + } } @@ -499,10 +429,6 @@ FNET_TransportThread::InitEventLoop() LOG(error, "Transport: InitEventLoop: object was deleted!"); return false; } - - _events = new FastOS_IOEvent[EVT_MAX]; - assert(_events != nullptr); - _now.SetNow(); _startTime = _now; _statTime = _now; @@ -511,16 +437,87 @@ FNET_TransportThread::InitEventLoop() } +void +FNET_TransportThread::handle_wakeup() +{ + Lock(); + CountEvent(_queue.FlushPackets_NoLock(&_myQueue)); + Unlock(); + + FNET_Context context; + FNET_Packet *packet = nullptr; + while ((packet = _myQueue.DequeuePacket_NoLock(&context)) != nullptr) { + + if (packet->GetCommand() == FNET_ControlPacket::FNET_CMD_EXECUTE) { + context._value.EXECUTABLE->execute(); + continue; + } + + if (context._value.IOC->_flags._ioc_delete) { + context._value.IOC->SubRef(); + continue; + } + + switch (packet->GetCommand()) { + case FNET_ControlPacket::FNET_CMD_IOC_ADD: + AddComponent(context._value.IOC); + context._value.IOC->_flags._ioc_added = true; + context._value.IOC->attach_selector(_selector); + break; + case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_READ: + context._value.IOC->EnableReadEvent(true); + context._value.IOC->SubRef(); + break; + case FNET_ControlPacket::FNET_CMD_IOC_DISABLE_READ: + context._value.IOC->EnableReadEvent(false); + context._value.IOC->SubRef(); + break; + case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_WRITE: + context._value.IOC->EnableWriteEvent(true); + context._value.IOC->SubRef(); + break; + case FNET_ControlPacket::FNET_CMD_IOC_DISABLE_WRITE: + context._value.IOC->EnableWriteEvent(false); + context._value.IOC->SubRef(); + break; + case FNET_ControlPacket::FNET_CMD_IOC_CLOSE: + if (context._value.IOC->_flags._ioc_added) { + RemoveComponent(context._value.IOC); + context._value.IOC->SubRef(); + } + context._value.IOC->Close(); + AddDeleteComponent(context._value.IOC); + break; + } + } +} + + +void +FNET_TransportThread::handle_event(FNET_IOComponent &ctx, bool read, bool write) +{ + if (!ctx._flags._ioc_delete) { + bool rc = true; + if (read) { + rc = rc && ctx.HandleReadEvent(); + } + if (write) { + rc = rc && ctx.HandleWriteEvent(); + } + if (!rc) { // IOC is broken, close it + RemoveComponent(&ctx); + ctx.Close(); + AddDeleteComponent(&ctx); + } + } +} + + bool FNET_TransportThread::EventLoopIteration() { - FNET_Packet *packet = nullptr; - FNET_Context context; FNET_IOComponent *component = nullptr; - int evt_cnt = 0; - FastOS_IOEvent *events = _events; int msTimeout = FNET_Scheduler::SLOT_TICK; - bool wakeUp = false; #ifdef FNET_SANITY_CHECKS FastOS_Time beforeGetEvents; @@ -537,7 +534,7 @@ FNET_TransportThread::EventLoopIteration() #endif // obtain I/O events - evt_cnt = _socketEvent.GetEvents(&wakeUp, msTimeout, events, EVT_MAX); + _selector.poll(msTimeout); CountEventLoop(); // sample current time (performed once per event loop iteration) @@ -551,83 +548,9 @@ FNET_TransportThread::EventLoopIteration() extractTime, msTimeout); #endif - // report event error (if any) - if (evt_cnt < 0) { - std::string str = FastOS_Socket::getLastErrorString(); - LOG(spam, "Transport: event error: %s", str.c_str()); - } else { - CountIOEvent(evt_cnt); - } - - // handle internal transport layer events - if (wakeUp) { - - Lock(); - CountEvent(_queue.FlushPackets_NoLock(&_myQueue)); - Unlock(); - - while ((packet = _myQueue.DequeuePacket_NoLock(&context)) != nullptr) { - - if (context._value.IOC->_flags._ioc_delete) { - context._value.IOC->SubRef(); - continue; - } - - switch (packet->GetCommand()) { - case FNET_ControlPacket::FNET_CMD_IOC_ADD: - AddComponent(context._value.IOC); - context._value.IOC->_flags._ioc_added = true; - context._value.IOC->SetSocketEvent(&_socketEvent); - break; - case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_READ: - context._value.IOC->EnableReadEvent(true); - context._value.IOC->SubRef(); - break; - case FNET_ControlPacket::FNET_CMD_IOC_DISABLE_READ: - context._value.IOC->EnableReadEvent(false); - context._value.IOC->SubRef(); - break; - case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_WRITE: - context._value.IOC->EnableWriteEvent(true); - context._value.IOC->SubRef(); - break; - case FNET_ControlPacket::FNET_CMD_IOC_DISABLE_WRITE: - context._value.IOC->EnableWriteEvent(false); - context._value.IOC->SubRef(); - break; - case FNET_ControlPacket::FNET_CMD_IOC_CLOSE: - if (context._value.IOC->_flags._ioc_added) { - RemoveComponent(context._value.IOC); - context._value.IOC->SubRef(); - } - context._value.IOC->Close(); - AddDeleteComponent(context._value.IOC); - break; - case FNET_ControlPacket::FNET_CMD_EXECUTE: - context._value.EXECUTABLE->execute(); - break; - } - } - } - - // handle I/O events - for (int i = 0; i < evt_cnt; i++) { - - component = (FNET_IOComponent *) events[i]._eventAttribute; - if (component == nullptr || component->_flags._ioc_delete) - continue; - - bool rc = true; - if (events[i]._readOccurred) - rc = rc && component->HandleReadEvent(); - if (events[i]._writeOccurred) - rc = rc && component->HandleWriteEvent(); - if (!rc) { // IOC is broken, close it - RemoveComponent(component); - component->Close(); - AddDeleteComponent(component); - } - } + // handle wakeup and io-events + CountIOEvent(_selector.num_events()); + _selector.dispatch(*this); // handle IOC time-outs if (_config._iocTimeOut > 0) { @@ -666,6 +589,8 @@ FNET_TransportThread::EventLoopIteration() Unlock(); // discard remaining events + FNET_Context context; + FNET_Packet *packet = nullptr; while ((packet = _myQueue.DequeuePacket_NoLock(&context)) != nullptr) { if (packet->GetCommand() == FNET_ControlPacket::FNET_CMD_EXECUTE) { context._value.EXECUTABLE->execute(); @@ -691,8 +616,6 @@ FNET_TransportThread::EventLoopIteration() _queue.IsEmpty_NoLock() && _myQueue.IsEmpty_NoLock()); - delete [] _events; - Lock(); _finished = true; if (_waitFinished) diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index a9e1f056d17..72402aa9e10 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -8,8 +8,9 @@ #include "packetqueue.h" #include "stats.h" #include <vespa/fastos/thread.h> -#include <vespa/fastos/socketevent.h> #include <vespa/fastos/time.h> +#include <vespa/vespalib/net/socket_handle.h> +#include <vespa/vespalib/net/selector.h> class FNET_Transport; class FNET_ControlPacket; @@ -26,9 +27,7 @@ class FNET_TransportThread : public FastOS_Runnable friend class FNET_IOComponent; public: - enum { - EVT_MAX = 4096 - }; + using Selector = vespalib::Selector<FNET_IOComponent>; #ifndef IAM_DOXYGEN class StatsTask : public FNET_Task @@ -61,8 +60,7 @@ private: FNET_IOComponent *_componentsTail; // I/O component list tail uint32_t _componentCnt; // # of components FNET_IOComponent *_deleteList; // IOC delete list - FastOS_SocketEvent _socketEvent; // I/O event generator - FastOS_IOEvent *_events; // I/O event array + Selector _selector; // I/O event generator FNET_PacketQueue_NoLock _queue; // outer event queue FNET_PacketQueue_NoLock _myQueue; // inner event queue FastOS_Cond _cond; // used for synchronization @@ -256,6 +254,11 @@ public: **/ FNET_Transport &owner() const { return _owner; } + /** + * Tune the given socket handle to be used as an async transport + * connection. + **/ + bool tune(vespalib::SocketHandle &handle) const; /** * Add a network listener in an abstract way. The given 'spec' @@ -587,6 +590,12 @@ public: bool InitEventLoop(); + // selector call-back for selector wakeup + void handle_wakeup(); + + // selector call-back for io-events + void handle_event(FNET_IOComponent &ctx, bool read, bool write); + /** * Perform a single transport thread event loop iteration. This * method is called by the FRT_Transport::Run method. If you want to |