aboutsummaryrefslogtreecommitdiffstats
path: root/fnet
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@oath.com>2018-08-27 08:56:49 +0000
committerHåvard Pettersen <havardpe@oath.com>2018-08-27 12:01:22 +0000
commitba1100ee847c6085a85cda1f264e7b699d85c407 (patch)
treefb07b3df5d5214ac324d14f74e7a841f112b4bfb /fnet
parent627e9f07b4ddbf25eb69946fed3131af15cc3996 (diff)
integrate Crypto{Engine,Socket} into fnet
Diffstat (limited to 'fnet')
-rw-r--r--fnet/src/vespa/fnet/connection.cpp148
-rw-r--r--fnet/src/vespa/fnet/connection.h19
-rw-r--r--fnet/src/vespa/fnet/transport.cpp9
-rw-r--r--fnet/src/vespa/fnet/transport.h28
4 files changed, 146 insertions, 58 deletions
diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp
index ff6c1ab0b1d..fc51697c8ea 100644
--- a/fnet/src/vespa/fnet/connection.cpp
+++ b/fnet/src/vespa/fnet/connection.cpp
@@ -219,54 +219,97 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode,
}
}
+bool
+FNET_Connection::handshake()
+{
+ bool broken = false;
+ switch (_socket->handshake()) {
+ case vespalib::CryptoSocket::HandshakeResult::FAIL:
+ LOG(debug, "Connection(%s): handshake failed", GetSpec());
+ SetState(FNET_CLOSED);
+ broken = true;
+ break;
+ case vespalib::CryptoSocket::HandshakeResult::DONE:
+ EnableReadEvent(true);
+ EnableWriteEvent(writePendingAfterConnect());
+ break;
+ case vespalib::CryptoSocket::HandshakeResult::NEED_READ:
+ EnableReadEvent(true);
+ EnableWriteEvent(false);
+ break;
+ case vespalib::CryptoSocket::HandshakeResult::NEED_WRITE:
+ EnableReadEvent(false);
+ EnableWriteEvent(true);
+ break;
+ }
+ return !broken;
+}
+
+bool
+FNET_Connection::handle_packets(uint32_t &read_packets)
+{
+ bool broken = false;
+ for (bool done = false; !done;) { // handle each complete packet in the buffer.
+ if (!_flags._gotheader) {
+ _flags._gotheader = _streamer->GetPacketInfo(&_input, &_packetLength,
+ &_packetCode, &_packetCHID,
+ &broken);
+ }
+ if (_flags._gotheader && (_input.GetDataLen() >= _packetLength)) {
+ read_packets++;
+ HandlePacket(_packetLength, _packetCode, _packetCHID);
+ _flags._gotheader = false; // reset header flag.
+ } else {
+ done = true;
+ }
+ }
+ return !broken;
+}
bool
FNET_Connection::Read()
{
+ size_t chunk_size = std::max(size_t(FNET_READ_SIZE), _socket->min_read_buffer_size());
uint32_t readData = 0; // total data read
uint32_t readPackets = 0; // total packets read
int readCnt = 0; // read count
bool broken = false; // is this conn broken ?
ssize_t res; // single read result
- _input.EnsureFree(FNET_READ_SIZE);
- res = _socket.read(_input.GetFree(), _input.GetFreeLen());
+ _input.EnsureFree(chunk_size);
+ res = _socket->read(_input.GetFree(), _input.GetFreeLen());
readCnt++;
while (res > 0) {
_input.FreeToData((uint32_t)res);
readData += (uint32_t)res;
-
- for (;;) { // handle each complete packet in the buffer.
-
- if (!_flags._gotheader)
- _flags._gotheader = _streamer->GetPacketInfo(&_input, &_packetLength,
- &_packetCode, &_packetCHID,
- &broken);
-
- if (_flags._gotheader && _input.GetDataLen() >= _packetLength) {
- readPackets++;
- HandlePacket(_packetLength, _packetCode, _packetCHID);
- _flags._gotheader = false; // reset header flag.
- } else {
- if (broken)
- goto done_read;
- break;
- }
- }
+ broken = !handle_packets(readPackets);
_input.resetIfEmpty();
-
- if (_input.GetFreeLen() > 0
- || readCnt >= FNET_READ_REDO) // prevent starvation
+ if (broken || (_input.GetFreeLen() > 0) || (readCnt >= FNET_READ_REDO)) {
goto done_read;
-
- _input.EnsureFree(FNET_READ_SIZE);
- res = _socket.read(_input.GetFree(), _input.GetFreeLen());
+ }
+ _input.EnsureFree(chunk_size);
+ res = _socket->read(_input.GetFree(), _input.GetFreeLen());
readCnt++;
}
done_read:
+ while ((res > 0) && !broken) { // drain input pipeline
+ _input.EnsureFree(chunk_size);
+ res = _socket->drain(_input.GetFree(), _input.GetFreeLen());
+ readCnt++;
+ if (res > 0) {
+ _input.FreeToData((uint32_t)res);
+ readData += (uint32_t)res;
+ broken = !handle_packets(readPackets);
+ _input.resetIfEmpty();
+ } else if (res == 0) { // fully drained -> EWOULDBLOCK
+ errno = EWOULDBLOCK;
+ res = -1;
+ }
+ }
+
if (readData > 0) {
UpdateTimeOut();
CountDataRead(readData);
@@ -298,6 +341,7 @@ done_read:
bool
FNET_Connection::Write(bool direct)
{
+ uint32_t my_write_work = 0;
uint32_t writtenData = 0; // total data written
uint32_t writtenPackets = 0; // total packets written
int writeCnt = 0; // write count
@@ -330,7 +374,7 @@ FNET_Connection::Write(bool direct)
// write data
- res = _socket.write(_output.GetData(), _output.GetDataLen());
+ res = _socket->write(_output.GetData(), _output.GetDataLen());
writeCnt++;
if (res > 0) {
_output.DataToDead((uint32_t)res);
@@ -342,6 +386,14 @@ FNET_Connection::Write(bool direct)
!_myQueue.IsEmpty_NoLock() &&
writeCnt < FNET_WRITE_REDO);
+ if ((_output.GetDataLen() > 0)) {
+ ++my_write_work;
+ }
+
+ while ((res > 0) && !broken) { // flush output pipeline
+ res = _socket->flush();
+ }
+
if (writtenData > 0) {
uint32_t maxSize = GetConfig()->_maxOutputBufferSize;
if (maxSize > 0 && _output.GetBufSize() > maxSize) {
@@ -350,7 +402,11 @@ FNET_Connection::Write(bool direct)
}
if (res < 0) {
- broken = ((errno != EWOULDBLOCK) && (errno != EAGAIN));
+ if ((errno == EWOULDBLOCK) || (errno == EAGAIN)) {
+ ++my_write_work; // incomplete write/flush
+ } else {
+ broken = true;
+ }
if (broken && (errno != ECONNRESET)) {
LOG(debug, "Connection(%s): write error: %d", GetSpec(), errno);
}
@@ -359,7 +415,7 @@ FNET_Connection::Write(bool direct)
std::unique_lock<std::mutex> guard(_ioc_lock);
_writeWork = _queue.GetPacketCnt_NoLock()
+ _myQueue.GetPacketCnt_NoLock()
- + ((_output.GetDataLen() > 0) ? 1 : 0);
+ + my_write_work;
_flags._writeLock = false;
if (_flags._discarding) {
_ioc_cond.notify_all();
@@ -407,10 +463,10 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner,
_streamer(streamer),
_serverAdapter(serverAdapter),
_adminChannel(nullptr),
- _socket(std::move(socket)),
+ _socket(owner->owner().create_crypto_socket(std::move(socket), true)),
_resolve_handler(nullptr),
_context(),
- _state(FNET_CONNECTED), // <-- NB
+ _state(FNET_CONNECTING),
_flags(),
_packetLength(0),
_packetCode(0),
@@ -425,9 +481,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner,
_callbackTarget(nullptr),
_cleanup(nullptr)
{
- assert(_socket.valid());
- LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(),
- GetStateString(FNET_CONNECTING), GetStateString(FNET_CONNECTED));
+ assert(_socket && (_socket->get_fd() >= 0));
}
@@ -484,9 +538,7 @@ FNET_Connection::Init()
{
// set up relevant events
EnableReadEvent(true);
- if (IsClient()) {
- EnableWriteEvent(true);
- }
+ EnableWriteEvent(true);
// init server admin channel
if (CanAcceptChannels() && _adminChannel == nullptr) {
@@ -516,11 +568,11 @@ FNET_Connection::handle_add_event()
{
if (_resolve_handler) {
auto tweak = [this](vespalib::SocketHandle &handle) { return Owner()->tune(handle); };
- _socket = _resolve_handler->address.connect(tweak);
- _ioc_socket_fd = _socket.get();
+ _socket = Owner()->owner().create_crypto_socket(_resolve_handler->address.connect(tweak), false);
+ _ioc_socket_fd = _socket->get_fd();
_resolve_handler.reset();
}
- return _socket.valid();
+ return (_socket && (_socket->get_fd() >= 0));
}
@@ -693,7 +745,8 @@ FNET_Connection::HandleReadEvent()
bool broken = false; // is connection broken ?
switch(_state) {
- case FNET_CONNECTING: // ignore read events while connecting
+ case FNET_CONNECTING:
+ broken = !handshake();
break;
case FNET_CONNECTED:
broken = !Read();
@@ -720,22 +773,11 @@ FNET_Connection::writePendingAfterConnect()
bool
FNET_Connection::HandleWriteEvent()
{
- int error; // socket error code
bool broken = false; // is connection broken ?
switch(_state) {
case FNET_CONNECTING:
- error = _socket.get_so_error();
- if (error == 0) { // connect ok
- if (!writePendingAfterConnect()) {
- EnableWriteEvent(false);
- }
- } else {
- LOG(debug, "Connection(%s): connect error: %d", GetSpec(), error);
-
- SetState(FNET_CLOSED); // connect failed.
- broken = true;
- }
+ broken = !handshake();
break;
case FNET_CONNECTED:
{
diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h
index ef883d84989..120c675dc70 100644
--- a/fnet/src/vespa/fnet/connection.h
+++ b/fnet/src/vespa/fnet/connection.h
@@ -9,6 +9,7 @@
#include "packetqueue.h"
#include <vespa/vespalib/net/socket_handle.h>
#include <vespa/vespalib/net/async_resolver.h>
+#include <vespa/vespalib/net/crypto_socket.h>
class FNET_IPacketStreamer;
class FNET_IServerAdapter;
@@ -89,7 +90,7 @@ private:
FNET_IPacketStreamer *_streamer; // custom packet streamer
FNET_IServerAdapter *_serverAdapter; // only on server side
FNET_Channel *_adminChannel; // only on client side
- vespalib::SocketHandle _socket; // socket for this conn
+ vespalib::CryptoSocket::UP _socket; // socket for this conn
ResolveHandlerSP _resolve_handler; // async resolve callback
FNET_Context _context; // connection context
State _state; // connection state
@@ -200,6 +201,22 @@ private:
void HandlePacket(uint32_t plen, uint32_t pcode, uint32_t chid);
/**
+ * Handle crypto handshaking
+ *
+ * @return false if socket is broken.
+ **/
+ bool handshake();
+
+ /**
+ * Handle all packets in the input buffer, calling HandlePacket
+ * for each one.
+ *
+ * @return false if socket is broken.
+ * @param read_packets count read packets here
+ **/
+ bool handle_packets(uint32_t &read_packets);
+
+ /**
* Read incoming data from socket.
*
* @return false if socket is broken.
diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp
index 675489fb229..0a484f9caaa 100644
--- a/fnet/src/vespa/fnet/transport.cpp
+++ b/fnet/src/vespa/fnet/transport.cpp
@@ -22,8 +22,9 @@ struct HashState {
} // namespace <unnamed>
-FNET_Transport::FNET_Transport(vespalib::AsyncResolver::SP resolver, size_t num_threads)
+FNET_Transport::FNET_Transport(vespalib::AsyncResolver::SP resolver, vespalib::CryptoEngine::SP crypto, size_t num_threads)
: _async_resolver(std::move(resolver)),
+ _crypto_engine(std::move(crypto)),
_threads()
{
assert(num_threads >= 1);
@@ -44,6 +45,12 @@ FNET_Transport::resolve_async(const vespalib::string &spec,
_async_resolver->resolve_async(spec, std::move(result_handler));
}
+vespalib::CryptoSocket::UP
+FNET_Transport::create_crypto_socket(vespalib::SocketHandle socket, bool is_server)
+{
+ return _crypto_engine->create_crypto_socket(std::move(socket), is_server);
+}
+
FNET_TransportThread *
FNET_Transport::select_thread(const void *key, size_t key_len) const
{
diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h
index 8ad48d37cad..a9c9eee2296 100644
--- a/fnet/src/vespa/fnet/transport.h
+++ b/fnet/src/vespa/fnet/transport.h
@@ -6,6 +6,7 @@
#include <memory>
#include <vector>
#include <vespa/vespalib/net/async_resolver.h>
+#include <vespa/vespalib/net/crypto_engine.h>
class FastOS_TimeInterface;
class FNET_TransportThread;
@@ -28,6 +29,7 @@ private:
using Threads = std::vector<Thread>;
vespalib::AsyncResolver::SP _async_resolver;
+ vespalib::CryptoEngine::SP _crypto_engine;
Threads _threads;
public:
@@ -38,9 +40,16 @@ public:
* the current thread become the transport thread. Main may only
* be called for single-threaded transports.
**/
- FNET_Transport(vespalib::AsyncResolver::SP resolver, size_t num_threads);
- FNET_Transport(size_t num_threads = 1)
- : FNET_Transport(vespalib::AsyncResolver::get_shared(), num_threads) {}
+ FNET_Transport(vespalib::AsyncResolver::SP resolver, vespalib::CryptoEngine::SP crypto, size_t num_threads);
+
+ FNET_Transport(vespalib::AsyncResolver::SP resolver, size_t num_threads)
+ : FNET_Transport(std::move(resolver), vespalib::CryptoEngine::get_default(), num_threads) {}
+ FNET_Transport(vespalib::CryptoEngine::SP crypto, size_t num_threads)
+ : FNET_Transport(vespalib::AsyncResolver::get_shared(), std::move(crypto), num_threads) {}
+ FNET_Transport(size_t num_threads)
+ : FNET_Transport(vespalib::AsyncResolver::get_shared(), vespalib::CryptoEngine::get_default(), num_threads) {}
+ FNET_Transport()
+ : FNET_Transport(vespalib::AsyncResolver::get_shared(), vespalib::CryptoEngine::get_default(), 1) {}
~FNET_Transport();
/**
@@ -57,6 +66,19 @@ public:
vespalib::AsyncResolver::ResultHandler::WP result_handler);
/**
+ * Wrap a plain socket endpoint in a CryptoSocket. The
+ * implementation will be determined by the CryptoEngine used by
+ * this Transport.
+ *
+ * @return socket abstraction able to perform encryption and decryption
+ * @param socket low-level socket
+ * @param is_server which end of the connection the socket
+ * represents. This is needed to support
+ * asymmetrical handshaking.
+ **/
+ vespalib::CryptoSocket::UP create_crypto_socket(vespalib::SocketHandle socket, bool is_server);
+
+ /**
* Select one of the underlying transport threads. The selection
* is based on hashing the given key as well as the current stack
* pointer.