diff options
author | HÃ¥vard Pettersen <havardpe@gmail.com> | 2018-08-30 14:12:22 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-08-30 14:12:22 +0200 |
commit | fdeffcd889575dac81f76b534cab833e1bd0a2e4 (patch) | |
tree | c8d5e3afa68ac2843db3c8f4a2a9f3f12e67709b /fnet/src | |
parent | 285594e2477a164a781d61a425b96205c5b4d728 (diff) | |
parent | a7eca91afb48a0d1664ec53eaebb86ffa069722b (diff) |
Merge pull request #6686 from vespa-engine/havardpe/integrate-crypto-engine-in-fnet
integrate Crypto{Engine,Socket} into fnet
Diffstat (limited to 'fnet/src')
-rw-r--r-- | fnet/src/vespa/fnet/connection.cpp | 163 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connection.h | 19 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport.cpp | 9 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport.h | 28 |
4 files changed, 161 insertions, 58 deletions
diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp index ff6c1ab0b1d..07086ca54a2 100644 --- a/fnet/src/vespa/fnet/connection.cpp +++ b/fnet/src/vespa/fnet/connection.cpp @@ -219,54 +219,109 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode, } } +bool +FNET_Connection::handshake() +{ + bool broken = false; + switch (_socket->handshake()) { + case vespalib::CryptoSocket::HandshakeResult::FAIL: + LOG(debug, "Connection(%s): handshake failed", GetSpec()); + SetState(FNET_CLOSED); + broken = true; + break; + case vespalib::CryptoSocket::HandshakeResult::DONE: { + EnableReadEvent(true); + EnableWriteEvent(writePendingAfterConnect()); + size_t chunk_size = std::max(size_t(FNET_READ_SIZE), _socket->min_read_buffer_size()); + uint32_t ignore_stats = 0; + ssize_t res = 0; + do { // drain input pipeline + _input.EnsureFree(chunk_size); + res = _socket->drain(_input.GetFree(), _input.GetFreeLen()); + if (res > 0) { + _input.FreeToData((uint32_t)res); + broken = !handle_packets(ignore_stats); + _input.resetIfEmpty(); + } + } while ((res > 0) && !broken); } + break; + case vespalib::CryptoSocket::HandshakeResult::NEED_READ: + EnableReadEvent(true); + EnableWriteEvent(false); + break; + case vespalib::CryptoSocket::HandshakeResult::NEED_WRITE: + EnableReadEvent(false); + EnableWriteEvent(true); + break; + } + return !broken; +} + +bool +FNET_Connection::handle_packets(uint32_t &read_packets) +{ + bool broken = false; + for (bool done = false; !done;) { // handle each complete packet in the buffer. + if (!_flags._gotheader) { + _flags._gotheader = _streamer->GetPacketInfo(&_input, &_packetLength, + &_packetCode, &_packetCHID, + &broken); + } + if (_flags._gotheader && (_input.GetDataLen() >= _packetLength)) { + read_packets++; + HandlePacket(_packetLength, _packetCode, _packetCHID); + _flags._gotheader = false; // reset header flag. + } else { + done = true; + } + } + return !broken; +} bool FNET_Connection::Read() { + size_t chunk_size = std::max(size_t(FNET_READ_SIZE), _socket->min_read_buffer_size()); uint32_t readData = 0; // total data read uint32_t readPackets = 0; // total packets read int readCnt = 0; // read count bool broken = false; // is this conn broken ? ssize_t res; // single read result - _input.EnsureFree(FNET_READ_SIZE); - res = _socket.read(_input.GetFree(), _input.GetFreeLen()); + _input.EnsureFree(chunk_size); + res = _socket->read(_input.GetFree(), _input.GetFreeLen()); readCnt++; while (res > 0) { _input.FreeToData((uint32_t)res); readData += (uint32_t)res; - - for (;;) { // handle each complete packet in the buffer. - - if (!_flags._gotheader) - _flags._gotheader = _streamer->GetPacketInfo(&_input, &_packetLength, - &_packetCode, &_packetCHID, - &broken); - - if (_flags._gotheader && _input.GetDataLen() >= _packetLength) { - readPackets++; - HandlePacket(_packetLength, _packetCode, _packetCHID); - _flags._gotheader = false; // reset header flag. - } else { - if (broken) - goto done_read; - break; - } - } + broken = !handle_packets(readPackets); _input.resetIfEmpty(); - - if (_input.GetFreeLen() > 0 - || readCnt >= FNET_READ_REDO) // prevent starvation + if (broken || (_input.GetFreeLen() > 0) || (readCnt >= FNET_READ_REDO)) { goto done_read; - - _input.EnsureFree(FNET_READ_SIZE); - res = _socket.read(_input.GetFree(), _input.GetFreeLen()); + } + _input.EnsureFree(chunk_size); + res = _socket->read(_input.GetFree(), _input.GetFreeLen()); readCnt++; } done_read: + while ((res > 0) && !broken) { // drain input pipeline + _input.EnsureFree(chunk_size); + res = _socket->drain(_input.GetFree(), _input.GetFreeLen()); + readCnt++; + if (res > 0) { + _input.FreeToData((uint32_t)res); + readData += (uint32_t)res; + broken = !handle_packets(readPackets); + _input.resetIfEmpty(); + } else if (res == 0) { // fully drained -> EWOULDBLOCK + errno = EWOULDBLOCK; + res = -1; + } + } + if (readData > 0) { UpdateTimeOut(); CountDataRead(readData); @@ -298,6 +353,7 @@ done_read: bool FNET_Connection::Write(bool direct) { + uint32_t my_write_work = 0; uint32_t writtenData = 0; // total data written uint32_t writtenPackets = 0; // total packets written int writeCnt = 0; // write count @@ -330,7 +386,7 @@ FNET_Connection::Write(bool direct) // write data - res = _socket.write(_output.GetData(), _output.GetDataLen()); + res = _socket->write(_output.GetData(), _output.GetDataLen()); writeCnt++; if (res > 0) { _output.DataToDead((uint32_t)res); @@ -342,6 +398,17 @@ FNET_Connection::Write(bool direct) !_myQueue.IsEmpty_NoLock() && writeCnt < FNET_WRITE_REDO); + if ((_output.GetDataLen() > 0)) { + ++my_write_work; + } + + if (res >= 0) { // flush output pipeline + res = _socket->flush(); + while (res > 0) { + res = _socket->flush(); + } + } + if (writtenData > 0) { uint32_t maxSize = GetConfig()->_maxOutputBufferSize; if (maxSize > 0 && _output.GetBufSize() > maxSize) { @@ -350,7 +417,11 @@ FNET_Connection::Write(bool direct) } if (res < 0) { - broken = ((errno != EWOULDBLOCK) && (errno != EAGAIN)); + if ((errno == EWOULDBLOCK) || (errno == EAGAIN)) { + ++my_write_work; // incomplete write/flush + } else { + broken = true; + } if (broken && (errno != ECONNRESET)) { LOG(debug, "Connection(%s): write error: %d", GetSpec(), errno); } @@ -359,7 +430,7 @@ FNET_Connection::Write(bool direct) std::unique_lock<std::mutex> guard(_ioc_lock); _writeWork = _queue.GetPacketCnt_NoLock() + _myQueue.GetPacketCnt_NoLock() - + ((_output.GetDataLen() > 0) ? 1 : 0); + + my_write_work; _flags._writeLock = false; if (_flags._discarding) { _ioc_cond.notify_all(); @@ -407,10 +478,10 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner, _streamer(streamer), _serverAdapter(serverAdapter), _adminChannel(nullptr), - _socket(std::move(socket)), + _socket(owner->owner().create_crypto_socket(std::move(socket), true)), _resolve_handler(nullptr), _context(), - _state(FNET_CONNECTED), // <-- NB + _state(FNET_CONNECTING), _flags(), _packetLength(0), _packetCode(0), @@ -425,9 +496,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner, _callbackTarget(nullptr), _cleanup(nullptr) { - assert(_socket.valid()); - LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(), - GetStateString(FNET_CONNECTING), GetStateString(FNET_CONNECTED)); + assert(_socket && (_socket->get_fd() >= 0)); } @@ -484,9 +553,7 @@ FNET_Connection::Init() { // set up relevant events EnableReadEvent(true); - if (IsClient()) { - EnableWriteEvent(true); - } + EnableWriteEvent(true); // init server admin channel if (CanAcceptChannels() && _adminChannel == nullptr) { @@ -516,11 +583,11 @@ FNET_Connection::handle_add_event() { if (_resolve_handler) { auto tweak = [this](vespalib::SocketHandle &handle) { return Owner()->tune(handle); }; - _socket = _resolve_handler->address.connect(tweak); - _ioc_socket_fd = _socket.get(); + _socket = Owner()->owner().create_crypto_socket(_resolve_handler->address.connect(tweak), false); + _ioc_socket_fd = _socket->get_fd(); _resolve_handler.reset(); } - return _socket.valid(); + return (_socket && (_socket->get_fd() >= 0)); } @@ -693,7 +760,8 @@ FNET_Connection::HandleReadEvent() bool broken = false; // is connection broken ? switch(_state) { - case FNET_CONNECTING: // ignore read events while connecting + case FNET_CONNECTING: + broken = !handshake(); break; case FNET_CONNECTED: broken = !Read(); @@ -720,22 +788,11 @@ FNET_Connection::writePendingAfterConnect() bool FNET_Connection::HandleWriteEvent() { - int error; // socket error code bool broken = false; // is connection broken ? switch(_state) { case FNET_CONNECTING: - error = _socket.get_so_error(); - if (error == 0) { // connect ok - if (!writePendingAfterConnect()) { - EnableWriteEvent(false); - } - } else { - LOG(debug, "Connection(%s): connect error: %d", GetSpec(), error); - - SetState(FNET_CLOSED); // connect failed. - broken = true; - } + broken = !handshake(); break; case FNET_CONNECTED: { diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h index ef883d84989..120c675dc70 100644 --- a/fnet/src/vespa/fnet/connection.h +++ b/fnet/src/vespa/fnet/connection.h @@ -9,6 +9,7 @@ #include "packetqueue.h" #include <vespa/vespalib/net/socket_handle.h> #include <vespa/vespalib/net/async_resolver.h> +#include <vespa/vespalib/net/crypto_socket.h> class FNET_IPacketStreamer; class FNET_IServerAdapter; @@ -89,7 +90,7 @@ private: FNET_IPacketStreamer *_streamer; // custom packet streamer FNET_IServerAdapter *_serverAdapter; // only on server side FNET_Channel *_adminChannel; // only on client side - vespalib::SocketHandle _socket; // socket for this conn + vespalib::CryptoSocket::UP _socket; // socket for this conn ResolveHandlerSP _resolve_handler; // async resolve callback FNET_Context _context; // connection context State _state; // connection state @@ -200,6 +201,22 @@ private: void HandlePacket(uint32_t plen, uint32_t pcode, uint32_t chid); /** + * Handle crypto handshaking + * + * @return false if socket is broken. + **/ + bool handshake(); + + /** + * Handle all packets in the input buffer, calling HandlePacket + * for each one. + * + * @return false if socket is broken. + * @param read_packets count read packets here + **/ + bool handle_packets(uint32_t &read_packets); + + /** * Read incoming data from socket. * * @return false if socket is broken. diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp index 675489fb229..0a484f9caaa 100644 --- a/fnet/src/vespa/fnet/transport.cpp +++ b/fnet/src/vespa/fnet/transport.cpp @@ -22,8 +22,9 @@ struct HashState { } // namespace <unnamed> -FNET_Transport::FNET_Transport(vespalib::AsyncResolver::SP resolver, size_t num_threads) +FNET_Transport::FNET_Transport(vespalib::AsyncResolver::SP resolver, vespalib::CryptoEngine::SP crypto, size_t num_threads) : _async_resolver(std::move(resolver)), + _crypto_engine(std::move(crypto)), _threads() { assert(num_threads >= 1); @@ -44,6 +45,12 @@ FNET_Transport::resolve_async(const vespalib::string &spec, _async_resolver->resolve_async(spec, std::move(result_handler)); } +vespalib::CryptoSocket::UP +FNET_Transport::create_crypto_socket(vespalib::SocketHandle socket, bool is_server) +{ + return _crypto_engine->create_crypto_socket(std::move(socket), is_server); +} + FNET_TransportThread * FNET_Transport::select_thread(const void *key, size_t key_len) const { diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h index 8ad48d37cad..a9c9eee2296 100644 --- a/fnet/src/vespa/fnet/transport.h +++ b/fnet/src/vespa/fnet/transport.h @@ -6,6 +6,7 @@ #include <memory> #include <vector> #include <vespa/vespalib/net/async_resolver.h> +#include <vespa/vespalib/net/crypto_engine.h> class FastOS_TimeInterface; class FNET_TransportThread; @@ -28,6 +29,7 @@ private: using Threads = std::vector<Thread>; vespalib::AsyncResolver::SP _async_resolver; + vespalib::CryptoEngine::SP _crypto_engine; Threads _threads; public: @@ -38,9 +40,16 @@ public: * the current thread become the transport thread. Main may only * be called for single-threaded transports. **/ - FNET_Transport(vespalib::AsyncResolver::SP resolver, size_t num_threads); - FNET_Transport(size_t num_threads = 1) - : FNET_Transport(vespalib::AsyncResolver::get_shared(), num_threads) {} + FNET_Transport(vespalib::AsyncResolver::SP resolver, vespalib::CryptoEngine::SP crypto, size_t num_threads); + + FNET_Transport(vespalib::AsyncResolver::SP resolver, size_t num_threads) + : FNET_Transport(std::move(resolver), vespalib::CryptoEngine::get_default(), num_threads) {} + FNET_Transport(vespalib::CryptoEngine::SP crypto, size_t num_threads) + : FNET_Transport(vespalib::AsyncResolver::get_shared(), std::move(crypto), num_threads) {} + FNET_Transport(size_t num_threads) + : FNET_Transport(vespalib::AsyncResolver::get_shared(), vespalib::CryptoEngine::get_default(), num_threads) {} + FNET_Transport() + : FNET_Transport(vespalib::AsyncResolver::get_shared(), vespalib::CryptoEngine::get_default(), 1) {} ~FNET_Transport(); /** @@ -57,6 +66,19 @@ public: vespalib::AsyncResolver::ResultHandler::WP result_handler); /** + * Wrap a plain socket endpoint in a CryptoSocket. The + * implementation will be determined by the CryptoEngine used by + * this Transport. + * + * @return socket abstraction able to perform encryption and decryption + * @param socket low-level socket + * @param is_server which end of the connection the socket + * represents. This is needed to support + * asymmetrical handshaking. + **/ + vespalib::CryptoSocket::UP create_crypto_socket(vespalib::SocketHandle socket, bool is_server); + + /** * Select one of the underlying transport threads. The selection * is based on hashing the given key as well as the current stack * pointer. |