From a23d7d3af0ceb2c28ccc878a8f953c8c591b3b8a Mon Sep 17 00:00:00 2001 From: Arne H Juul Date: Mon, 13 Mar 2017 13:52:58 +0100 Subject: perform async connect in same thread * always set socket non-blocking in Init() before calling Connect() * pick up async connect result in HandleWriteEvent * also revert "set incoming connections in non-blocking mode" since that is now always done in Init() * revert "closed while not added to event loop" handling (there is no longer a window where a connection can be closed before it is added to the transport event loop) * remove component for separate connect thread --- fnet/CMakeLists.txt | 1 - fnet/src/testlist.txt | 1 - fnet/src/tests/connect_thread/.gitignore | 1 - fnet/src/tests/connect_thread/CMakeLists.txt | 8 ---- .../tests/connect_thread/connect_thread_test.cpp | 27 ----------- fnet/src/tests/info/info.cpp | 2 +- fnet/src/vespa/fnet/CMakeLists.txt | 1 - fnet/src/vespa/fnet/connect_thread.cpp | 55 ---------------------- fnet/src/vespa/fnet/connect_thread.h | 37 --------------- fnet/src/vespa/fnet/connection.cpp | 53 ++++++++++++++------- fnet/src/vespa/fnet/connection.h | 11 +---- fnet/src/vespa/fnet/connector.cpp | 2 +- fnet/src/vespa/fnet/iocomponent.h | 4 +- fnet/src/vespa/fnet/transport.cpp | 11 ++--- fnet/src/vespa/fnet/transport.h | 14 ------ fnet/src/vespa/fnet/transport_thread.cpp | 22 +++------ 16 files changed, 51 insertions(+), 199 deletions(-) delete mode 100644 fnet/src/tests/connect_thread/.gitignore delete mode 100644 fnet/src/tests/connect_thread/CMakeLists.txt delete mode 100644 fnet/src/tests/connect_thread/connect_thread_test.cpp delete mode 100644 fnet/src/vespa/fnet/connect_thread.cpp delete mode 100644 fnet/src/vespa/fnet/connect_thread.h diff --git a/fnet/CMakeLists.txt b/fnet/CMakeLists.txt index b55d96ceaa4..cc18e276a32 100644 --- a/fnet/CMakeLists.txt +++ b/fnet/CMakeLists.txt @@ -14,7 +14,6 @@ vespa_define_module( src/examples/ping src/examples/proxy src/examples/timeout - src/tests/connect_thread src/tests/connection_spread src/tests/databuffer src/tests/examples diff --git a/fnet/src/testlist.txt b/fnet/src/testlist.txt index 4aaef3ebfcb..39ebc3d3d4b 100644 --- a/fnet/src/testlist.txt +++ b/fnet/src/testlist.txt @@ -1,4 +1,3 @@ -tests/connect_thread tests/connection_spread tests/databuffer tests/examples diff --git a/fnet/src/tests/connect_thread/.gitignore b/fnet/src/tests/connect_thread/.gitignore deleted file mode 100644 index 66bba07002d..00000000000 --- a/fnet/src/tests/connect_thread/.gitignore +++ /dev/null @@ -1 +0,0 @@ -fnet_connect_thread_test_app diff --git a/fnet/src/tests/connect_thread/CMakeLists.txt b/fnet/src/tests/connect_thread/CMakeLists.txt deleted file mode 100644 index 337ab336a7b..00000000000 --- a/fnet/src/tests/connect_thread/CMakeLists.txt +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(fnet_connect_thread_test_app TEST - SOURCES - connect_thread_test.cpp - DEPENDS - fnet -) -vespa_add_test(NAME fnet_connect_thread_test_app COMMAND fnet_connect_thread_test_app) diff --git a/fnet/src/tests/connect_thread/connect_thread_test.cpp b/fnet/src/tests/connect_thread/connect_thread_test.cpp deleted file mode 100644 index b5304cf9b9e..00000000000 --- a/fnet/src/tests/connect_thread/connect_thread_test.cpp +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include -#include -#include - -struct MyConn : public fnet::ExtConnectable { - bool connected = false; - void ext_connect() override { connected = true; } -}; - -TEST("require that connect thread will connect stuff") { - std::vector conns(5); - { - fnet::ConnectThread thread; - thread.connect_later(&conns[0]); - thread.connect_later(&conns[2]); - thread.connect_later(&conns[4]); - } - EXPECT_TRUE(conns[0].connected); - EXPECT_TRUE(!conns[1].connected); - EXPECT_TRUE(conns[2].connected); - EXPECT_TRUE(!conns[3].connected); - EXPECT_TRUE(conns[4].connected); -} - -TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp index 12320427053..16d9d548ebf 100644 --- a/fnet/src/tests/info/info.cpp +++ b/fnet/src/tests/info/info.cpp @@ -73,7 +73,7 @@ TEST("size of important objects") EXPECT_EQUAL(184u, sizeof(FNET_IOComponent)); EXPECT_EQUAL(32u, sizeof(FNET_Channel)); EXPECT_EQUAL(40u, sizeof(FNET_PacketQueue_NoLock)); - EXPECT_EQUAL(480u, sizeof(FNET_Connection)); + EXPECT_EQUAL(472u, sizeof(FNET_Connection)); EXPECT_EQUAL(96u, sizeof(FastOS_Cond)); EXPECT_EQUAL(56u, sizeof(FNET_DataBuffer)); EXPECT_EQUAL(24u, sizeof(FastOS_Time)); diff --git a/fnet/src/vespa/fnet/CMakeLists.txt b/fnet/src/vespa/fnet/CMakeLists.txt index 61f07d08b53..befe0f4ffa2 100644 --- a/fnet/src/vespa/fnet/CMakeLists.txt +++ b/fnet/src/vespa/fnet/CMakeLists.txt @@ -4,7 +4,6 @@ vespa_add_library(fnet channel.cpp channellookup.cpp config.cpp - connect_thread.cpp connection.cpp connector.cpp context.cpp diff --git a/fnet/src/vespa/fnet/connect_thread.cpp b/fnet/src/vespa/fnet/connect_thread.cpp deleted file mode 100644 index 8c1b38596de..00000000000 --- a/fnet/src/vespa/fnet/connect_thread.cpp +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "connect_thread.h" - -namespace fnet { - -void -ConnectThread::run() -{ - for (;;) { - Guard guard(_lock); - while (!_done && _queue.empty()) { - _cond.wait(guard); - } - if (_done && _queue.empty()) { - return; - } - assert(!_queue.empty()); - ExtConnectable *conn = _queue.front(); - _queue.pop(); - guard.unlock(); // UNLOCK - conn->ext_connect(); - } -} - -ConnectThread::ConnectThread() - : _lock(), - _cond(), - _queue(), - _done(false), - _thread(&ConnectThread::run, this) -{ -} - -ConnectThread::~ConnectThread() -{ - { - Guard guard(_lock); - _done = true; - _cond.notify_one(); - } - _thread.join(); - assert(_queue.empty()); -} - -void -ConnectThread::connect_later(ExtConnectable *conn) -{ - Guard guard(_lock); - assert(!_done); - _queue.push(conn); - _cond.notify_one(); -} - -} // namespace fnet diff --git a/fnet/src/vespa/fnet/connect_thread.h b/fnet/src/vespa/fnet/connect_thread.h deleted file mode 100644 index e963187f99d..00000000000 --- a/fnet/src/vespa/fnet/connect_thread.h +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - - -#pragma once - -#include "ext_connectable.h" -#include -#include -#include -#include - -namespace fnet { - -/** - * An object encapsulating a thread responsible for doing synchronous - * external connect. - **/ -class ConnectThread -{ -private: - using Guard = std::unique_lock; - - std::mutex _lock; - std::condition_variable _cond; - vespalib::ArrayQueue _queue; - bool _done; - std::thread _thread; - - void run(); - -public: - ConnectThread(); - ~ConnectThread(); - void connect_later(ExtConnectable *conn); -}; - -} // namespace fnet diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp index 5fc9c62db92..beb7a16abbb 100644 --- a/fnet/src/vespa/fnet/connection.cpp +++ b/fnet/src/vespa/fnet/connection.cpp @@ -480,12 +480,29 @@ FNET_Connection::~FNET_Connection() bool FNET_Connection::Init() { - bool rc = _socket->TuneTransport(); + bool rc = _socket->SetSoBlocking(false) + && _socket->TuneTransport(); + if (rc) { if (GetConfig()->_tcpNoDelay) _socket->SetNoDelay(true); EnableReadEvent(true); - EnableWriteEvent(true); + + if (IsClient()) { + if (_socket->Connect()) { + SetState(FNET_CONNECTED); + } else { + int error = FastOS_Socket::GetLastError(); + + if (error == FastOS_Socket::ERR_INPROGRESS || + error == FastOS_Socket::ERR_WOULDBLOCK) { + EnableWriteEvent(true); + } else { + rc = false; + LOG(debug, "Connection(%s): connect error: %d", GetSpec(), error); + } + } + } } // init server admin channel @@ -501,19 +518,6 @@ FNET_Connection::Init() return (rc && _state <= FNET_CONNECTED); } -void -FNET_Connection::ext_connect() -{ - bool rc = _socket->Connect() // NB: sync connect - && _socket->SetSoBlocking(false); - if (rc) { - SetState(FNET_CONNECTED); - Owner()->Add(this, /* needRef = */ false); - } else { - Owner()->Add(this, /* needRef = */ true); - Owner()->Close(this, /* needRef = */ false); - } -} void FNET_Connection::SetCleanupHandler(FNET_IConnectionCleanupHandler *handler) @@ -710,10 +714,27 @@ FNET_Connection::HandleReadEvent() bool FNET_Connection::HandleWriteEvent() { + int error; // socket error code bool broken = false; // is connection broken ? switch(_state) { - case FNET_CONNECTING: // ignore write events while connecting + case FNET_CONNECTING: + error = _socket->GetSoError(); + if (error == 0) { // connect ok + Lock(); + _state = FNET_CONNECTED; // SetState(FNET_CONNECTED) + LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(), + GetStateString(FNET_CONNECTING), GetStateString(FNET_CONNECTED)); + bool writePending = (_writeWork > 0); + Unlock(); + if (!writePending) + EnableWriteEvent(false); + } else { + LOG(debug, "Connection(%s): connect error: %d", GetSpec(), error); + + SetState(FNET_CLOSED); // connect failed. + broken = true; + } break; case FNET_CONNECTED: Lock(); diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h index 3374e0b4fc3..12b3c85b672 100644 --- a/fnet/src/vespa/fnet/connection.h +++ b/fnet/src/vespa/fnet/connection.h @@ -2,7 +2,6 @@ #pragma once -#include "ext_connectable.h" #include "iocomponent.h" #include "databuffer.h" #include "context.h" @@ -46,8 +45,7 @@ public: * connection. Only the client side may open new channels on the * connection. **/ -class FNET_Connection : public FNET_IOComponent, - public fnet::ExtConnectable +class FNET_Connection : public FNET_IOComponent { public: enum State { @@ -319,13 +317,6 @@ public: **/ bool Init(); - /** - * Performs sync socket connect, sync connect post-setup and adds - * this connection to the event loop. Calling this function - * implicitly gives away 1 reference to this object. - **/ - void ext_connect() override; - /** * Register a cleanup handler to be invoked when this connection is * about to be destructed. diff --git a/fnet/src/vespa/fnet/connector.cpp b/fnet/src/vespa/fnet/connector.cpp index 5cb27335523..a14a798d7c6 100644 --- a/fnet/src/vespa/fnet/connector.cpp +++ b/fnet/src/vespa/fnet/connector.cpp @@ -87,7 +87,7 @@ FNET_Connector::HandleReadEvent() FNET_Transport &transport = Owner()->owner(); FNET_TransportThread *thread = transport.select_thread(newSocket, sizeof(FastOS_Socket)); conn = new FNET_Connection(thread, _streamer, _serverAdapter, newSocket, GetSpec()); - if (newSocket->SetSoBlocking(false) && conn->Init()) { + if (conn->Init()) { conn->Owner()->Add(conn, false); } else { LOG(debug, "Connector(%s): failed to init incoming connection", diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h index d04e83830c3..cb719c3051d 100644 --- a/fnet/src/vespa/fnet/iocomponent.h +++ b/fnet/src/vespa/fnet/iocomponent.h @@ -31,15 +31,13 @@ class FNET_IOComponent _ioc_writeEnabled(false), _ioc_shouldTimeOut(shouldTimeout), _ioc_added(false), - _ioc_delete(false), - _ioc_want_close(false) + _ioc_delete(false) { } bool _ioc_readEnabled; // read event enabled ? bool _ioc_writeEnabled; // write event enabled ? bool _ioc_shouldTimeOut; // component should timeout ? bool _ioc_added; // was added to event loop bool _ioc_delete; // going down... - bool _ioc_want_close; // closed while not added to event loop }; protected: FNET_IOComponent *_ioc_next; // next in list diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp index 129017b53f7..067016872e3 100644 --- a/fnet/src/vespa/fnet/transport.cpp +++ b/fnet/src/vespa/fnet/transport.cpp @@ -1,9 +1,9 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "transport.h" -#include "connect_thread.h" #include "transport_thread.h" #include "iocomponent.h" +#include #include namespace { @@ -23,8 +23,7 @@ struct HashState { } // namespace FNET_Transport::FNET_Transport(size_t num_threads) - : _threads(), - _connect_thread(std::make_unique()) + : _threads() { assert(num_threads >= 1); for (size_t i = 0; i < num_threads; ++i) { @@ -34,10 +33,6 @@ FNET_Transport::FNET_Transport(size_t num_threads) FNET_Transport::~FNET_Transport() { } -void FNET_Transport::connect_later(fnet::ExtConnectable *conn) { - _connect_thread->connect_later(conn); -} - FNET_TransportThread * FNET_Transport::select_thread(const void *key, size_t key_len) const { @@ -220,4 +215,4 @@ void FNET_Transport::Main() { assert(_threads.size() == 1); _threads[0]->Main(); -} \ No newline at end of file +} diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h index dc7b1d7f626..c0d9a86c08e 100644 --- a/fnet/src/vespa/fnet/transport.h +++ b/fnet/src/vespa/fnet/transport.h @@ -2,7 +2,6 @@ #pragma once -#include "ext_connectable.h" #include "context.h" #include #include @@ -16,11 +15,6 @@ class FNET_IServerAdapter; class FNET_IPacketHandler; class FNET_Scheduler; -namespace fnet { - class ConnectThread; - class ExtConnectable; -} - /** * This class represents the transport layer and handles a collection * of transport threads. Note: remember to shut down your transport @@ -33,7 +27,6 @@ private: using Threads = std::vector; Threads _threads; - std::unique_ptr _connect_thread; public: /** @@ -46,13 +39,6 @@ public: FNET_Transport(size_t num_threads = 1); ~FNET_Transport(); - /** - * Calling this function gives away 1 reference to 'conn' and - * ensures that the 'ext_connect' function will be called on it - * from another thread some time in the future. - **/ - void connect_later(fnet::ExtConnectable *conn); - /** * Select one of the underlying transport threads. The selection * is based on hashing the given key as well as the current stack diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index 6ded04822cf..e8da525dbb9 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -124,7 +124,6 @@ void FNET_TransportThread::AddDeleteComponent(FNET_IOComponent *comp) { assert(!comp->_flags._ioc_delete); - comp->_flags._ioc_added = false; comp->_flags._ioc_delete = true; comp->_ioc_prev = nullptr; comp->_ioc_next = _deleteList; @@ -351,7 +350,7 @@ FNET_TransportThread::Connect(const char *spec, FNET_IPacketStreamer *streamer, connContext, mysocket, spec); if (conn->Init()) { conn->AddRef_NoLock(); - owner().connect_later(conn); + Add(conn, /* needRef = */ false); return conn; } else { delete conn; @@ -576,14 +575,9 @@ FNET_TransportThread::EventLoopIteration() switch (packet->GetCommand()) { case FNET_ControlPacket::FNET_CMD_IOC_ADD: - if (context._value.IOC->_flags._ioc_want_close) { - context._value.IOC->Close(); - context._value.IOC->SubRef(); - } else { - AddComponent(context._value.IOC); - context._value.IOC->_flags._ioc_added = true; - context._value.IOC->SetSocketEvent(&_socketEvent); - } + AddComponent(context._value.IOC); + context._value.IOC->_flags._ioc_added = true; + context._value.IOC->SetSocketEvent(&_socketEvent); break; case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_READ: context._value.IOC->EnableReadEvent(true); @@ -604,12 +598,10 @@ FNET_TransportThread::EventLoopIteration() case FNET_ControlPacket::FNET_CMD_IOC_CLOSE: if (context._value.IOC->_flags._ioc_added) { RemoveComponent(context._value.IOC); - context._value.IOC->Close(); - AddDeleteComponent(context._value.IOC); - } else { - context._value.IOC->_flags._ioc_want_close = true; + context._value.IOC->SubRef(); } - context._value.IOC->SubRef(); + context._value.IOC->Close(); + AddDeleteComponent(context._value.IOC); break; case FNET_ControlPacket::FNET_CMD_EXECUTE: context._value.EXECUTABLE->execute(); -- cgit v1.2.3