summaryrefslogtreecommitdiffstats
path: root/fnet
diff options
context:
space:
mode:
authorArne H Juul <arnej@yahoo-inc.com>2017-03-13 13:52:58 +0100
committerArne H Juul <arnej@yahoo-inc.com>2017-03-13 15:28:29 +0100
commita23d7d3af0ceb2c28ccc878a8f953c8c591b3b8a (patch)
treeadd02a89be59ffbabaf938f29f96fafc0c9110ae /fnet
parent720617cf9e441059558870525cfefe94a85df788 (diff)
perform async connect in same thread
* always set socket non-blocking in Init() before calling Connect() * pick up async connect result in HandleWriteEvent * also revert "set incoming connections in non-blocking mode" since that is now always done in Init() * revert "closed while not added to event loop" handling (there is no longer a window where a connection can be closed before it is added to the transport event loop) * remove component for separate connect thread
Diffstat (limited to 'fnet')
-rw-r--r--fnet/CMakeLists.txt1
-rw-r--r--fnet/src/testlist.txt1
-rw-r--r--fnet/src/tests/connect_thread/.gitignore1
-rw-r--r--fnet/src/tests/connect_thread/CMakeLists.txt8
-rw-r--r--fnet/src/tests/connect_thread/connect_thread_test.cpp27
-rw-r--r--fnet/src/tests/info/info.cpp2
-rw-r--r--fnet/src/vespa/fnet/CMakeLists.txt1
-rw-r--r--fnet/src/vespa/fnet/connect_thread.cpp55
-rw-r--r--fnet/src/vespa/fnet/connect_thread.h37
-rw-r--r--fnet/src/vespa/fnet/connection.cpp53
-rw-r--r--fnet/src/vespa/fnet/connection.h11
-rw-r--r--fnet/src/vespa/fnet/connector.cpp2
-rw-r--r--fnet/src/vespa/fnet/iocomponent.h4
-rw-r--r--fnet/src/vespa/fnet/transport.cpp11
-rw-r--r--fnet/src/vespa/fnet/transport.h14
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp22
16 files changed, 51 insertions, 199 deletions
diff --git a/fnet/CMakeLists.txt b/fnet/CMakeLists.txt
index b55d96ceaa4..cc18e276a32 100644
--- a/fnet/CMakeLists.txt
+++ b/fnet/CMakeLists.txt
@@ -14,7 +14,6 @@ vespa_define_module(
src/examples/ping
src/examples/proxy
src/examples/timeout
- src/tests/connect_thread
src/tests/connection_spread
src/tests/databuffer
src/tests/examples
diff --git a/fnet/src/testlist.txt b/fnet/src/testlist.txt
index 4aaef3ebfcb..39ebc3d3d4b 100644
--- a/fnet/src/testlist.txt
+++ b/fnet/src/testlist.txt
@@ -1,4 +1,3 @@
-tests/connect_thread
tests/connection_spread
tests/databuffer
tests/examples
diff --git a/fnet/src/tests/connect_thread/.gitignore b/fnet/src/tests/connect_thread/.gitignore
deleted file mode 100644
index 66bba07002d..00000000000
--- a/fnet/src/tests/connect_thread/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-fnet_connect_thread_test_app
diff --git a/fnet/src/tests/connect_thread/CMakeLists.txt b/fnet/src/tests/connect_thread/CMakeLists.txt
deleted file mode 100644
index 337ab336a7b..00000000000
--- a/fnet/src/tests/connect_thread/CMakeLists.txt
+++ /dev/null
@@ -1,8 +0,0 @@
-# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(fnet_connect_thread_test_app TEST
- SOURCES
- connect_thread_test.cpp
- DEPENDS
- fnet
-)
-vespa_add_test(NAME fnet_connect_thread_test_app COMMAND fnet_connect_thread_test_app)
diff --git a/fnet/src/tests/connect_thread/connect_thread_test.cpp b/fnet/src/tests/connect_thread/connect_thread_test.cpp
deleted file mode 100644
index b5304cf9b9e..00000000000
--- a/fnet/src/tests/connect_thread/connect_thread_test.cpp
+++ /dev/null
@@ -1,27 +0,0 @@
-// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/vespalib/testkit/test_kit.h>
-#include <vespa/fnet/connect_thread.h>
-#include <vespa/fnet/ext_connectable.h>
-
-struct MyConn : public fnet::ExtConnectable {
- bool connected = false;
- void ext_connect() override { connected = true; }
-};
-
-TEST("require that connect thread will connect stuff") {
- std::vector<MyConn> conns(5);
- {
- fnet::ConnectThread thread;
- thread.connect_later(&conns[0]);
- thread.connect_later(&conns[2]);
- thread.connect_later(&conns[4]);
- }
- EXPECT_TRUE(conns[0].connected);
- EXPECT_TRUE(!conns[1].connected);
- EXPECT_TRUE(conns[2].connected);
- EXPECT_TRUE(!conns[3].connected);
- EXPECT_TRUE(conns[4].connected);
-}
-
-TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp
index 12320427053..16d9d548ebf 100644
--- a/fnet/src/tests/info/info.cpp
+++ b/fnet/src/tests/info/info.cpp
@@ -73,7 +73,7 @@ TEST("size of important objects")
EXPECT_EQUAL(184u, sizeof(FNET_IOComponent));
EXPECT_EQUAL(32u, sizeof(FNET_Channel));
EXPECT_EQUAL(40u, sizeof(FNET_PacketQueue_NoLock));
- EXPECT_EQUAL(480u, sizeof(FNET_Connection));
+ EXPECT_EQUAL(472u, sizeof(FNET_Connection));
EXPECT_EQUAL(96u, sizeof(FastOS_Cond));
EXPECT_EQUAL(56u, sizeof(FNET_DataBuffer));
EXPECT_EQUAL(24u, sizeof(FastOS_Time));
diff --git a/fnet/src/vespa/fnet/CMakeLists.txt b/fnet/src/vespa/fnet/CMakeLists.txt
index 61f07d08b53..befe0f4ffa2 100644
--- a/fnet/src/vespa/fnet/CMakeLists.txt
+++ b/fnet/src/vespa/fnet/CMakeLists.txt
@@ -4,7 +4,6 @@ vespa_add_library(fnet
channel.cpp
channellookup.cpp
config.cpp
- connect_thread.cpp
connection.cpp
connector.cpp
context.cpp
diff --git a/fnet/src/vespa/fnet/connect_thread.cpp b/fnet/src/vespa/fnet/connect_thread.cpp
deleted file mode 100644
index 8c1b38596de..00000000000
--- a/fnet/src/vespa/fnet/connect_thread.cpp
+++ /dev/null
@@ -1,55 +0,0 @@
-// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "connect_thread.h"
-
-namespace fnet {
-
-void
-ConnectThread::run()
-{
- for (;;) {
- Guard guard(_lock);
- while (!_done && _queue.empty()) {
- _cond.wait(guard);
- }
- if (_done && _queue.empty()) {
- return;
- }
- assert(!_queue.empty());
- ExtConnectable *conn = _queue.front();
- _queue.pop();
- guard.unlock(); // UNLOCK
- conn->ext_connect();
- }
-}
-
-ConnectThread::ConnectThread()
- : _lock(),
- _cond(),
- _queue(),
- _done(false),
- _thread(&ConnectThread::run, this)
-{
-}
-
-ConnectThread::~ConnectThread()
-{
- {
- Guard guard(_lock);
- _done = true;
- _cond.notify_one();
- }
- _thread.join();
- assert(_queue.empty());
-}
-
-void
-ConnectThread::connect_later(ExtConnectable *conn)
-{
- Guard guard(_lock);
- assert(!_done);
- _queue.push(conn);
- _cond.notify_one();
-}
-
-} // namespace fnet
diff --git a/fnet/src/vespa/fnet/connect_thread.h b/fnet/src/vespa/fnet/connect_thread.h
deleted file mode 100644
index e963187f99d..00000000000
--- a/fnet/src/vespa/fnet/connect_thread.h
+++ /dev/null
@@ -1,37 +0,0 @@
-// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-
-#pragma once
-
-#include "ext_connectable.h"
-#include <vespa/vespalib/util/arrayqueue.hpp>
-#include <thread>
-#include <mutex>
-#include <condition_variable>
-
-namespace fnet {
-
-/**
- * An object encapsulating a thread responsible for doing synchronous
- * external connect.
- **/
-class ConnectThread
-{
-private:
- using Guard = std::unique_lock<std::mutex>;
-
- std::mutex _lock;
- std::condition_variable _cond;
- vespalib::ArrayQueue<ExtConnectable*> _queue;
- bool _done;
- std::thread _thread;
-
- void run();
-
-public:
- ConnectThread();
- ~ConnectThread();
- void connect_later(ExtConnectable *conn);
-};
-
-} // namespace fnet
diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp
index 5fc9c62db92..beb7a16abbb 100644
--- a/fnet/src/vespa/fnet/connection.cpp
+++ b/fnet/src/vespa/fnet/connection.cpp
@@ -480,12 +480,29 @@ FNET_Connection::~FNET_Connection()
bool
FNET_Connection::Init()
{
- bool rc = _socket->TuneTransport();
+ bool rc = _socket->SetSoBlocking(false)
+ && _socket->TuneTransport();
+
if (rc) {
if (GetConfig()->_tcpNoDelay)
_socket->SetNoDelay(true);
EnableReadEvent(true);
- EnableWriteEvent(true);
+
+ if (IsClient()) {
+ if (_socket->Connect()) {
+ SetState(FNET_CONNECTED);
+ } else {
+ int error = FastOS_Socket::GetLastError();
+
+ if (error == FastOS_Socket::ERR_INPROGRESS ||
+ error == FastOS_Socket::ERR_WOULDBLOCK) {
+ EnableWriteEvent(true);
+ } else {
+ rc = false;
+ LOG(debug, "Connection(%s): connect error: %d", GetSpec(), error);
+ }
+ }
+ }
}
// init server admin channel
@@ -501,19 +518,6 @@ FNET_Connection::Init()
return (rc && _state <= FNET_CONNECTED);
}
-void
-FNET_Connection::ext_connect()
-{
- bool rc = _socket->Connect() // NB: sync connect
- && _socket->SetSoBlocking(false);
- if (rc) {
- SetState(FNET_CONNECTED);
- Owner()->Add(this, /* needRef = */ false);
- } else {
- Owner()->Add(this, /* needRef = */ true);
- Owner()->Close(this, /* needRef = */ false);
- }
-}
void
FNET_Connection::SetCleanupHandler(FNET_IConnectionCleanupHandler *handler)
@@ -710,10 +714,27 @@ FNET_Connection::HandleReadEvent()
bool
FNET_Connection::HandleWriteEvent()
{
+ int error; // socket error code
bool broken = false; // is connection broken ?
switch(_state) {
- case FNET_CONNECTING: // ignore write events while connecting
+ case FNET_CONNECTING:
+ error = _socket->GetSoError();
+ if (error == 0) { // connect ok
+ Lock();
+ _state = FNET_CONNECTED; // SetState(FNET_CONNECTED)
+ LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(),
+ GetStateString(FNET_CONNECTING), GetStateString(FNET_CONNECTED));
+ bool writePending = (_writeWork > 0);
+ Unlock();
+ if (!writePending)
+ EnableWriteEvent(false);
+ } else {
+ LOG(debug, "Connection(%s): connect error: %d", GetSpec(), error);
+
+ SetState(FNET_CLOSED); // connect failed.
+ broken = true;
+ }
break;
case FNET_CONNECTED:
Lock();
diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h
index 3374e0b4fc3..12b3c85b672 100644
--- a/fnet/src/vespa/fnet/connection.h
+++ b/fnet/src/vespa/fnet/connection.h
@@ -2,7 +2,6 @@
#pragma once
-#include "ext_connectable.h"
#include "iocomponent.h"
#include "databuffer.h"
#include "context.h"
@@ -46,8 +45,7 @@ public:
* connection. Only the client side may open new channels on the
* connection.
**/
-class FNET_Connection : public FNET_IOComponent,
- public fnet::ExtConnectable
+class FNET_Connection : public FNET_IOComponent
{
public:
enum State {
@@ -320,13 +318,6 @@ public:
bool Init();
/**
- * Performs sync socket connect, sync connect post-setup and adds
- * this connection to the event loop. Calling this function
- * implicitly gives away 1 reference to this object.
- **/
- void ext_connect() override;
-
- /**
* Register a cleanup handler to be invoked when this connection is
* about to be destructed.
*
diff --git a/fnet/src/vespa/fnet/connector.cpp b/fnet/src/vespa/fnet/connector.cpp
index 5cb27335523..a14a798d7c6 100644
--- a/fnet/src/vespa/fnet/connector.cpp
+++ b/fnet/src/vespa/fnet/connector.cpp
@@ -87,7 +87,7 @@ FNET_Connector::HandleReadEvent()
FNET_Transport &transport = Owner()->owner();
FNET_TransportThread *thread = transport.select_thread(newSocket, sizeof(FastOS_Socket));
conn = new FNET_Connection(thread, _streamer, _serverAdapter, newSocket, GetSpec());
- if (newSocket->SetSoBlocking(false) && conn->Init()) {
+ if (conn->Init()) {
conn->Owner()->Add(conn, false);
} else {
LOG(debug, "Connector(%s): failed to init incoming connection",
diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h
index d04e83830c3..cb719c3051d 100644
--- a/fnet/src/vespa/fnet/iocomponent.h
+++ b/fnet/src/vespa/fnet/iocomponent.h
@@ -31,15 +31,13 @@ class FNET_IOComponent
_ioc_writeEnabled(false),
_ioc_shouldTimeOut(shouldTimeout),
_ioc_added(false),
- _ioc_delete(false),
- _ioc_want_close(false)
+ _ioc_delete(false)
{ }
bool _ioc_readEnabled; // read event enabled ?
bool _ioc_writeEnabled; // write event enabled ?
bool _ioc_shouldTimeOut; // component should timeout ?
bool _ioc_added; // was added to event loop
bool _ioc_delete; // going down...
- bool _ioc_want_close; // closed while not added to event loop
};
protected:
FNET_IOComponent *_ioc_next; // next in list
diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp
index 129017b53f7..067016872e3 100644
--- a/fnet/src/vespa/fnet/transport.cpp
+++ b/fnet/src/vespa/fnet/transport.cpp
@@ -1,9 +1,9 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "transport.h"
-#include "connect_thread.h"
#include "transport_thread.h"
#include "iocomponent.h"
+#include <chrono>
#include <vespa/vespalib/xxhash/xxhash.h>
namespace {
@@ -23,8 +23,7 @@ struct HashState {
} // namespace <unnamed>
FNET_Transport::FNET_Transport(size_t num_threads)
- : _threads(),
- _connect_thread(std::make_unique<fnet::ConnectThread>())
+ : _threads()
{
assert(num_threads >= 1);
for (size_t i = 0; i < num_threads; ++i) {
@@ -34,10 +33,6 @@ FNET_Transport::FNET_Transport(size_t num_threads)
FNET_Transport::~FNET_Transport() { }
-void FNET_Transport::connect_later(fnet::ExtConnectable *conn) {
- _connect_thread->connect_later(conn);
-}
-
FNET_TransportThread *
FNET_Transport::select_thread(const void *key, size_t key_len) const
{
@@ -220,4 +215,4 @@ void
FNET_Transport::Main() {
assert(_threads.size() == 1);
_threads[0]->Main();
-} \ No newline at end of file
+}
diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h
index dc7b1d7f626..c0d9a86c08e 100644
--- a/fnet/src/vespa/fnet/transport.h
+++ b/fnet/src/vespa/fnet/transport.h
@@ -2,7 +2,6 @@
#pragma once
-#include "ext_connectable.h"
#include "context.h"
#include <memory>
#include <vector>
@@ -16,11 +15,6 @@ class FNET_IServerAdapter;
class FNET_IPacketHandler;
class FNET_Scheduler;
-namespace fnet {
- class ConnectThread;
- class ExtConnectable;
-}
-
/**
* This class represents the transport layer and handles a collection
* of transport threads. Note: remember to shut down your transport
@@ -33,7 +27,6 @@ private:
using Threads = std::vector<Thread>;
Threads _threads;
- std::unique_ptr<fnet::ConnectThread> _connect_thread;
public:
/**
@@ -47,13 +40,6 @@ public:
~FNET_Transport();
/**
- * Calling this function gives away 1 reference to 'conn' and
- * ensures that the 'ext_connect' function will be called on it
- * from another thread some time in the future.
- **/
- void connect_later(fnet::ExtConnectable *conn);
-
- /**
* Select one of the underlying transport threads. The selection
* is based on hashing the given key as well as the current stack
* pointer.
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index 6ded04822cf..e8da525dbb9 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -124,7 +124,6 @@ void
FNET_TransportThread::AddDeleteComponent(FNET_IOComponent *comp)
{
assert(!comp->_flags._ioc_delete);
- comp->_flags._ioc_added = false;
comp->_flags._ioc_delete = true;
comp->_ioc_prev = nullptr;
comp->_ioc_next = _deleteList;
@@ -351,7 +350,7 @@ FNET_TransportThread::Connect(const char *spec, FNET_IPacketStreamer *streamer,
connContext, mysocket, spec);
if (conn->Init()) {
conn->AddRef_NoLock();
- owner().connect_later(conn);
+ Add(conn, /* needRef = */ false);
return conn;
} else {
delete conn;
@@ -576,14 +575,9 @@ FNET_TransportThread::EventLoopIteration()
switch (packet->GetCommand()) {
case FNET_ControlPacket::FNET_CMD_IOC_ADD:
- if (context._value.IOC->_flags._ioc_want_close) {
- context._value.IOC->Close();
- context._value.IOC->SubRef();
- } else {
- AddComponent(context._value.IOC);
- context._value.IOC->_flags._ioc_added = true;
- context._value.IOC->SetSocketEvent(&_socketEvent);
- }
+ AddComponent(context._value.IOC);
+ context._value.IOC->_flags._ioc_added = true;
+ context._value.IOC->SetSocketEvent(&_socketEvent);
break;
case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_READ:
context._value.IOC->EnableReadEvent(true);
@@ -604,12 +598,10 @@ FNET_TransportThread::EventLoopIteration()
case FNET_ControlPacket::FNET_CMD_IOC_CLOSE:
if (context._value.IOC->_flags._ioc_added) {
RemoveComponent(context._value.IOC);
- context._value.IOC->Close();
- AddDeleteComponent(context._value.IOC);
- } else {
- context._value.IOC->_flags._ioc_want_close = true;
+ context._value.IOC->SubRef();
}
- context._value.IOC->SubRef();
+ context._value.IOC->Close();
+ AddDeleteComponent(context._value.IOC);
break;
case FNET_ControlPacket::FNET_CMD_EXECUTE:
context._value.EXECUTABLE->execute();