diff options
Diffstat (limited to 'fnet')
-rw-r--r-- | fnet/CMakeLists.txt | 1 | ||||
-rw-r--r-- | fnet/src/tests/connect/CMakeLists.txt | 8 | ||||
-rw-r--r-- | fnet/src/tests/connect/connect_test.cpp | 122 | ||||
-rw-r--r-- | fnet/src/tests/info/info.cpp | 2 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connection.cpp | 54 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connection.h | 23 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/iocomponent.cpp | 7 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/iocomponent.h | 11 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport.cpp | 17 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport.h | 19 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.cpp | 25 |
11 files changed, 264 insertions, 25 deletions
diff --git a/fnet/CMakeLists.txt b/fnet/CMakeLists.txt index b18399960f5..142c19fca3e 100644 --- a/fnet/CMakeLists.txt +++ b/fnet/CMakeLists.txt @@ -14,6 +14,7 @@ vespa_define_module( src/examples/ping src/examples/proxy src/examples/timeout + src/tests/connect src/tests/connection_spread src/tests/databuffer src/tests/examples diff --git a/fnet/src/tests/connect/CMakeLists.txt b/fnet/src/tests/connect/CMakeLists.txt new file mode 100644 index 00000000000..35ff9131dff --- /dev/null +++ b/fnet/src/tests/connect/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(fnet_connect_test_app TEST + SOURCES + connect_test.cpp + DEPENDS + fnet +) +vespa_add_test(NAME fnet_connect_test_app COMMAND fnet_connect_test_app) diff --git a/fnet/src/tests/connect/connect_test.cpp b/fnet/src/tests/connect/connect_test.cpp new file mode 100644 index 00000000000..5e48390a297 --- /dev/null +++ b/fnet/src/tests/connect/connect_test.cpp @@ -0,0 +1,122 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/fnet/fnet.h> +#include <vespa/vespalib/net/server_socket.h> +#include <vespa/vespalib/util/sync.h> +#include <vespa/vespalib/util/stringfmt.h> + +using namespace vespalib; + +struct BlockingHostResolver : public AsyncResolver::HostResolver { + AsyncResolver::SimpleHostResolver resolver; + Gate caller; + Gate barrier; + BlockingHostResolver() : resolver(), caller(), barrier() {} + vespalib::string ip_address(const vespalib::string &host) override { + fprintf(stderr, "blocking resolve request: '%s'\n", host.c_str()); + caller.countDown(); + barrier.await(); + vespalib::string result = resolver.ip_address(host); + fprintf(stderr, "returning resolve result: '%s'\n", result.c_str()); + return result; + } + void wait_for_caller() { caller.await(); } + void release_caller() { barrier.countDown(); } +}; + +AsyncResolver::SP make_resolver(AsyncResolver::HostResolver::SP host_resolver) { + AsyncResolver::Params params; + params.resolver = host_resolver; + return AsyncResolver::create(params); +} + +//----------------------------------------------------------------------------- + +struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler { + FNET_SimplePacketStreamer streamer; + FastOS_ThreadPool pool; + FNET_Transport transport; + Gate conn_lost; + Gate conn_deleted; + TransportFixture() : streamer(nullptr), pool(128 * 1024), transport(), + conn_lost(), conn_deleted() + { + transport.Start(&pool); + } + TransportFixture(AsyncResolver::HostResolver::SP host_resolver) + : streamer(nullptr), pool(128 * 1024), transport(make_resolver(std::move(host_resolver)), 1), + conn_lost(), conn_deleted() + { + transport.Start(&pool); + } + HP_RetCode HandlePacket(FNET_Packet *packet, FNET_Context) override { + ASSERT_TRUE(packet->GetCommand() == FNET_ControlPacket::FNET_CMD_CHANNEL_LOST); + conn_lost.countDown(); + packet->Free(); + return FNET_FREE_CHANNEL; + } + void Cleanup(FNET_Connection *) override { conn_deleted.countDown(); } + FNET_Connection *connect(const vespalib::string &spec) { + FNET_Connection *conn = transport.Connect(spec.c_str(), &streamer, this); + ASSERT_TRUE(conn != nullptr); + conn->SetCleanupHandler(this); + return conn; + } + ~TransportFixture() { + transport.ShutDown(true); + pool.Close(); + } +}; + +//----------------------------------------------------------------------------- + +TEST_MT_FFF("require that normal connect works", 2, + ServerSocket("tcp/0"), TransportFixture(), TimeBomb(60)) +{ + if (thread_id == 0) { + SocketHandle socket = f1.accept(); + EXPECT_TRUE(socket.valid()); + TEST_BARRIER(); + } else { + vespalib::string spec = make_string("tcp/localhost:%d", f1.address().port()); + FNET_Connection *conn = f2.connect(spec); + TEST_BARRIER(); + conn->Owner()->Close(conn); + EXPECT_TRUE(f2.conn_lost.await(60000)); + EXPECT_TRUE(!f2.conn_deleted.await(20)); + conn->SubRef(); + EXPECT_TRUE(f2.conn_deleted.await(60000)); + } +} + +TEST_FF("require that bogus connect fail asynchronously", TransportFixture(), TimeBomb(60)) { + FNET_Connection *conn = f1.connect("invalid"); + EXPECT_TRUE(f1.conn_lost.await(60000)); + EXPECT_TRUE(!f1.conn_deleted.await(20)); + conn->SubRef(); + EXPECT_TRUE(f1.conn_deleted.await(60000)); +} + +TEST_MT_FFFF("require that async close can be called before async resolve completes", 2, + ServerSocket("tcp/0"), std::shared_ptr<BlockingHostResolver>(new BlockingHostResolver()), + TransportFixture(f2), TimeBomb(60)) +{ + if (thread_id == 0) { + SocketHandle socket = f1.accept(); + EXPECT_TRUE(!socket.valid()); + } else { + vespalib::string spec = make_string("tcp/localhost:%d", f1.address().port()); + FNET_Connection *conn = f3.connect(spec); + f2->wait_for_caller(); + conn->Owner()->Close(conn); + EXPECT_TRUE(f3.conn_lost.await(60000)); + f2->release_caller(); + EXPECT_TRUE(!f3.conn_deleted.await(20)); + conn->SubRef(); + EXPECT_TRUE(f3.conn_deleted.await(60000)); + f1.shutdown(); + } +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp index 5b93c3fcd3f..ea8de81cff5 100644 --- a/fnet/src/tests/info/info.cpp +++ b/fnet/src/tests/info/info.cpp @@ -73,7 +73,7 @@ TEST("size of important objects") EXPECT_EQUAL(192u, sizeof(FNET_IOComponent)); EXPECT_EQUAL(32u, sizeof(FNET_Channel)); EXPECT_EQUAL(40u, sizeof(FNET_PacketQueue_NoLock)); - EXPECT_EQUAL(480u, sizeof(FNET_Connection)); + EXPECT_EQUAL(496u, sizeof(FNET_Connection)); EXPECT_EQUAL(96u, sizeof(FastOS_Cond)); EXPECT_EQUAL(56u, sizeof(FNET_DataBuffer)); EXPECT_EQUAL(24u, sizeof(FastOS_Time)); diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp index f1ed051bdc1..180e7ea9adf 100644 --- a/fnet/src/vespa/fnet/connection.cpp +++ b/fnet/src/vespa/fnet/connection.cpp @@ -8,6 +8,7 @@ #include "iserveradapter.h" #include "config.h" #include "transport_thread.h" +#include "transport.h" #include <vespa/log/log.h> LOG_SETUP(".fnet"); @@ -49,9 +50,29 @@ SyncPacket::Free() _cond.Signal(); _cond.Unlock(); } +} + +FNET_Connection::ResolveHandler::ResolveHandler(FNET_Connection *conn) + : connection(conn), + address() +{ + connection->AddRef(); +} + +void +FNET_Connection::ResolveHandler::handle_result(vespalib::SocketAddress result) +{ + address = result; + connection->Owner()->Add(connection); } +FNET_Connection::ResolveHandler::~ResolveHandler() +{ + connection->SubRef(); +} + + /////////////////////// // PROTECTED METHODS // /////////////////////// @@ -394,6 +415,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner, _serverAdapter(serverAdapter), _adminChannel(nullptr), _socket(std::move(socket)), + _resolve_handler(nullptr), _context(), _state(FNET_CONNECTED), // <-- NB _flags(), @@ -422,13 +444,13 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner, FNET_IPacketHandler *adminHandler, FNET_Context adminContext, FNET_Context context, - vespalib::SocketHandle socket, const char *spec) - : FNET_IOComponent(owner, socket.get(), spec, /* time-out = */ true), + : FNET_IOComponent(owner, -1, spec, /* time-out = */ true), _streamer(streamer), _serverAdapter(serverAdapter), _adminChannel(nullptr), - _socket(std::move(socket)), + _socket(), + _resolve_handler(nullptr), _context(context), _state(FNET_CONNECTING), _flags(), @@ -445,7 +467,6 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner, _callbackTarget(nullptr), _cleanup(nullptr) { - assert(_socket.valid()); if (adminHandler != nullptr) { FNET_Channel::UP admin(new FNET_Channel(FNET_NOID, this, adminHandler, adminContext)); _adminChannel = admin.get(); @@ -484,7 +505,29 @@ FNET_Connection::Init() } // handle close by admin channel init - return (_state <= FNET_CONNECTED); + if (_state == FNET_CLOSED) { + return false; + } + + // initiate async resolve + if (IsClient()) { + _resolve_handler = std::make_shared<ResolveHandler>(this); + Owner()->owner().resolve_async(GetSpec(), _resolve_handler); + } + return true; +} + + +bool +FNET_Connection::handle_add_event() +{ + if (_resolve_handler) { + auto tweak = [this](vespalib::SocketHandle &handle) { return Owner()->tune(handle); }; + _socket = _resolve_handler->address.connect(tweak); + _ioc_socket_fd = _socket.get(); + _resolve_handler.reset(); + } + return _socket.valid(); } @@ -653,6 +696,7 @@ FNET_Connection::CleanupHook() void FNET_Connection::Close() { + _resolve_handler.reset(); detach_selector(); SetState(FNET_CLOSED); _ioc_socket_fd = -1; diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h index 511d50a451e..699516554e2 100644 --- a/fnet/src/vespa/fnet/connection.h +++ b/fnet/src/vespa/fnet/connection.h @@ -8,6 +8,7 @@ #include "channellookup.h" #include "packetqueue.h" #include <vespa/vespalib/net/socket_handle.h> +#include <vespa/vespalib/net/async_resolver.h> class FNET_IPacketStreamer; class FNET_IServerAdapter; @@ -77,10 +78,19 @@ private: bool _callbackWait; bool _discarding; }; + struct ResolveHandler : public vespalib::AsyncResolver::ResultHandler { + FNET_Connection *connection; + vespalib::SocketAddress address; + ResolveHandler(FNET_Connection *conn); + void handle_result(vespalib::SocketAddress result) override; + ~ResolveHandler(); + }; + using ResolveHandlerSP = std::shared_ptr<ResolveHandler>; FNET_IPacketStreamer *_streamer; // custom packet streamer FNET_IServerAdapter *_serverAdapter; // only on server side FNET_Channel *_adminChannel; // only on client side vespalib::SocketHandle _socket; // socket for this conn + ResolveHandlerSP _resolve_handler; // async resolve callback FNET_Context _context; // connection context State _state; // connection state Flags _flags; // Packed flags. @@ -231,7 +241,6 @@ public: * @param adminHandler packet handler for admin channel * @param adminContext context for admin channel * @param context initial context for this connection - * @param socket the underlying socket used for IO * @param spec connect spec **/ FNET_Connection(FNET_TransportThread *owner, @@ -240,7 +249,6 @@ public: FNET_IPacketHandler *adminHandler, FNET_Context adminContext, FNET_Context context, - vespalib::SocketHandle socket, const char *spec); /** @@ -318,6 +326,17 @@ public: bool Init(); /** + * Called by the transport thread as the initial part of adding + * this connection to the selection loop. If this is an incoming + * connection (already connected) this function does nothing. If + * this is an outgoing connection this function will use the async + * resolve result to create a socket and initiate async connect. + * + * @return false if connection broken, true otherwise. + **/ + bool handle_add_event() override; + + /** * Register a cleanup handler to be invoked when this connection is * about to be destructed. * diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp index a3c3911db54..52c9cfd09b5 100644 --- a/fnet/src/vespa/fnet/iocomponent.cpp +++ b/fnet/src/vespa/fnet/iocomponent.cpp @@ -136,6 +136,13 @@ FNET_IOComponent::EnableWriteEvent(bool enabled) } +bool +FNET_IOComponent::handle_add_event() +{ + return true; +} + + void FNET_IOComponent::CleanupHook() { diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h index 2e9bcb9fc0e..d0d8cee85b7 100644 --- a/fnet/src/vespa/fnet/iocomponent.h +++ b/fnet/src/vespa/fnet/iocomponent.h @@ -301,6 +301,17 @@ public: //----------- virtual methods below ----------------------// + /** + * This function is called as the first step of adding an io + * component to the selection loop. The default implementation + * will always return true. This can be overridden to perform + * delayed setup in the network thread. If this function returns + * false, the component is broken and should be closed + * immediately. + * + * @return false if broken, true otherwise. + **/ + virtual bool handle_add_event(); /** * This method is called by the SubRef methods just before the diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp index 4177d80706d..675489fb229 100644 --- a/fnet/src/vespa/fnet/transport.cpp +++ b/fnet/src/vespa/fnet/transport.cpp @@ -22,8 +22,9 @@ struct HashState { } // namespace <unnamed> -FNET_Transport::FNET_Transport(size_t num_threads) - : _threads() +FNET_Transport::FNET_Transport(vespalib::AsyncResolver::SP resolver, size_t num_threads) + : _async_resolver(std::move(resolver)), + _threads() { assert(num_threads >= 1); for (size_t i = 0; i < num_threads; ++i) { @@ -31,7 +32,17 @@ FNET_Transport::FNET_Transport(size_t num_threads) } } -FNET_Transport::~FNET_Transport() { } +FNET_Transport::~FNET_Transport() +{ + _async_resolver->wait_for_pending_resolves(); +} + +void +FNET_Transport::resolve_async(const vespalib::string &spec, + vespalib::AsyncResolver::ResultHandler::WP result_handler) +{ + _async_resolver->resolve_async(spec, std::move(result_handler)); +} FNET_TransportThread * FNET_Transport::select_thread(const void *key, size_t key_len) const diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h index 87dc54ddb86..8ad48d37cad 100644 --- a/fnet/src/vespa/fnet/transport.h +++ b/fnet/src/vespa/fnet/transport.h @@ -5,6 +5,7 @@ #include "context.h" #include <memory> #include <vector> +#include <vespa/vespalib/net/async_resolver.h> class FastOS_TimeInterface; class FNET_TransportThread; @@ -26,6 +27,7 @@ private: using Thread = std::unique_ptr<FNET_TransportThread>; using Threads = std::vector<Thread>; + vespalib::AsyncResolver::SP _async_resolver; Threads _threads; public: @@ -36,10 +38,25 @@ public: * the current thread become the transport thread. Main may only * be called for single-threaded transports. **/ - FNET_Transport(size_t num_threads = 1); + FNET_Transport(vespalib::AsyncResolver::SP resolver, size_t num_threads); + FNET_Transport(size_t num_threads = 1) + : FNET_Transport(vespalib::AsyncResolver::get_shared(), num_threads) {} ~FNET_Transport(); /** + * Resolve a connect spec into a socket address to be used to + * connect to a remote socket. This operation will be performed + * asynchronously and the result will be given to the result + * handler when ready. The result handler may be discarded to + * cancel the request. + * + * @param spec connect spec + * @param result handler + **/ + void resolve_async(const vespalib::string &spec, + vespalib::AsyncResolver::ResultHandler::WP result_handler); + + /** * Select one of the underlying transport threads. The selection * is based on hashing the given key as well as the current stack * pointer. diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index ddeb6dfa93d..4a637b03532 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -276,16 +276,10 @@ FNET_TransportThread::Connect(const char *spec, FNET_IPacketStreamer *streamer, FNET_IServerAdapter *serverAdapter, FNET_Context connContext) { - auto tweak = [this](SocketHandle &handle) { return tune(handle); }; - SocketHandle handle = SocketSpec(spec).client_address().connect(tweak); - if (handle.valid()) { - std::unique_ptr<FNET_Connection> conn = std::make_unique<FNET_Connection>(this, streamer, serverAdapter, - adminHandler, adminContext, connContext, std::move(handle), spec); - if (conn->Init()) { - conn->AddRef_NoLock(); - Add(conn.get(), /*needRef = */ false); - return conn.release(); - } + std::unique_ptr<FNET_Connection> conn = std::make_unique<FNET_Connection>(this, streamer, serverAdapter, + adminHandler, adminContext, connContext, spec); + if (conn->Init()) { + return conn.release(); } return nullptr; } @@ -460,9 +454,14 @@ FNET_TransportThread::handle_wakeup() switch (packet->GetCommand()) { case FNET_ControlPacket::FNET_CMD_IOC_ADD: - AddComponent(context._value.IOC); - context._value.IOC->_flags._ioc_added = true; - context._value.IOC->attach_selector(_selector); + if (context._value.IOC->handle_add_event()) { + AddComponent(context._value.IOC); + context._value.IOC->_flags._ioc_added = true; + context._value.IOC->attach_selector(_selector); + } else { + context._value.IOC->Close(); + AddDeleteComponent(context._value.IOC); + } break; case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_READ: context._value.IOC->EnableReadEvent(true); |