aboutsummaryrefslogtreecommitdiffstats
path: root/fnet
diff options
context:
space:
mode:
authorHaavard <havardpe@yahoo-inc.com>2017-07-07 13:11:26 +0000
committerHaavard <havardpe@yahoo-inc.com>2017-07-10 13:32:01 +0000
commit6a314a55ab6709e87264cd148d2efc3f50db1864 (patch)
treebdb4a905d86534ae712b759d28357b0351daad72 /fnet
parente522f9accfee1a5b616d284dffb5d035ffc0a5d3 (diff)
handle async resolve
Diffstat (limited to 'fnet')
-rw-r--r--fnet/CMakeLists.txt1
-rw-r--r--fnet/src/tests/connect/CMakeLists.txt8
-rw-r--r--fnet/src/tests/connect/connect_test.cpp122
-rw-r--r--fnet/src/tests/info/info.cpp2
-rw-r--r--fnet/src/vespa/fnet/connection.cpp54
-rw-r--r--fnet/src/vespa/fnet/connection.h23
-rw-r--r--fnet/src/vespa/fnet/iocomponent.cpp7
-rw-r--r--fnet/src/vespa/fnet/iocomponent.h11
-rw-r--r--fnet/src/vespa/fnet/transport.cpp17
-rw-r--r--fnet/src/vespa/fnet/transport.h19
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp25
11 files changed, 264 insertions, 25 deletions
diff --git a/fnet/CMakeLists.txt b/fnet/CMakeLists.txt
index b18399960f5..142c19fca3e 100644
--- a/fnet/CMakeLists.txt
+++ b/fnet/CMakeLists.txt
@@ -14,6 +14,7 @@ vespa_define_module(
src/examples/ping
src/examples/proxy
src/examples/timeout
+ src/tests/connect
src/tests/connection_spread
src/tests/databuffer
src/tests/examples
diff --git a/fnet/src/tests/connect/CMakeLists.txt b/fnet/src/tests/connect/CMakeLists.txt
new file mode 100644
index 00000000000..35ff9131dff
--- /dev/null
+++ b/fnet/src/tests/connect/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(fnet_connect_test_app TEST
+ SOURCES
+ connect_test.cpp
+ DEPENDS
+ fnet
+)
+vespa_add_test(NAME fnet_connect_test_app COMMAND fnet_connect_test_app)
diff --git a/fnet/src/tests/connect/connect_test.cpp b/fnet/src/tests/connect/connect_test.cpp
new file mode 100644
index 00000000000..5e48390a297
--- /dev/null
+++ b/fnet/src/tests/connect/connect_test.cpp
@@ -0,0 +1,122 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/fnet/fnet.h>
+#include <vespa/vespalib/net/server_socket.h>
+#include <vespa/vespalib/util/sync.h>
+#include <vespa/vespalib/util/stringfmt.h>
+
+using namespace vespalib;
+
+struct BlockingHostResolver : public AsyncResolver::HostResolver {
+ AsyncResolver::SimpleHostResolver resolver;
+ Gate caller;
+ Gate barrier;
+ BlockingHostResolver() : resolver(), caller(), barrier() {}
+ vespalib::string ip_address(const vespalib::string &host) override {
+ fprintf(stderr, "blocking resolve request: '%s'\n", host.c_str());
+ caller.countDown();
+ barrier.await();
+ vespalib::string result = resolver.ip_address(host);
+ fprintf(stderr, "returning resolve result: '%s'\n", result.c_str());
+ return result;
+ }
+ void wait_for_caller() { caller.await(); }
+ void release_caller() { barrier.countDown(); }
+};
+
+AsyncResolver::SP make_resolver(AsyncResolver::HostResolver::SP host_resolver) {
+ AsyncResolver::Params params;
+ params.resolver = host_resolver;
+ return AsyncResolver::create(params);
+}
+
+//-----------------------------------------------------------------------------
+
+struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler {
+ FNET_SimplePacketStreamer streamer;
+ FastOS_ThreadPool pool;
+ FNET_Transport transport;
+ Gate conn_lost;
+ Gate conn_deleted;
+ TransportFixture() : streamer(nullptr), pool(128 * 1024), transport(),
+ conn_lost(), conn_deleted()
+ {
+ transport.Start(&pool);
+ }
+ TransportFixture(AsyncResolver::HostResolver::SP host_resolver)
+ : streamer(nullptr), pool(128 * 1024), transport(make_resolver(std::move(host_resolver)), 1),
+ conn_lost(), conn_deleted()
+ {
+ transport.Start(&pool);
+ }
+ HP_RetCode HandlePacket(FNET_Packet *packet, FNET_Context) override {
+ ASSERT_TRUE(packet->GetCommand() == FNET_ControlPacket::FNET_CMD_CHANNEL_LOST);
+ conn_lost.countDown();
+ packet->Free();
+ return FNET_FREE_CHANNEL;
+ }
+ void Cleanup(FNET_Connection *) override { conn_deleted.countDown(); }
+ FNET_Connection *connect(const vespalib::string &spec) {
+ FNET_Connection *conn = transport.Connect(spec.c_str(), &streamer, this);
+ ASSERT_TRUE(conn != nullptr);
+ conn->SetCleanupHandler(this);
+ return conn;
+ }
+ ~TransportFixture() {
+ transport.ShutDown(true);
+ pool.Close();
+ }
+};
+
+//-----------------------------------------------------------------------------
+
+TEST_MT_FFF("require that normal connect works", 2,
+ ServerSocket("tcp/0"), TransportFixture(), TimeBomb(60))
+{
+ if (thread_id == 0) {
+ SocketHandle socket = f1.accept();
+ EXPECT_TRUE(socket.valid());
+ TEST_BARRIER();
+ } else {
+ vespalib::string spec = make_string("tcp/localhost:%d", f1.address().port());
+ FNET_Connection *conn = f2.connect(spec);
+ TEST_BARRIER();
+ conn->Owner()->Close(conn);
+ EXPECT_TRUE(f2.conn_lost.await(60000));
+ EXPECT_TRUE(!f2.conn_deleted.await(20));
+ conn->SubRef();
+ EXPECT_TRUE(f2.conn_deleted.await(60000));
+ }
+}
+
+TEST_FF("require that bogus connect fail asynchronously", TransportFixture(), TimeBomb(60)) {
+ FNET_Connection *conn = f1.connect("invalid");
+ EXPECT_TRUE(f1.conn_lost.await(60000));
+ EXPECT_TRUE(!f1.conn_deleted.await(20));
+ conn->SubRef();
+ EXPECT_TRUE(f1.conn_deleted.await(60000));
+}
+
+TEST_MT_FFFF("require that async close can be called before async resolve completes", 2,
+ ServerSocket("tcp/0"), std::shared_ptr<BlockingHostResolver>(new BlockingHostResolver()),
+ TransportFixture(f2), TimeBomb(60))
+{
+ if (thread_id == 0) {
+ SocketHandle socket = f1.accept();
+ EXPECT_TRUE(!socket.valid());
+ } else {
+ vespalib::string spec = make_string("tcp/localhost:%d", f1.address().port());
+ FNET_Connection *conn = f3.connect(spec);
+ f2->wait_for_caller();
+ conn->Owner()->Close(conn);
+ EXPECT_TRUE(f3.conn_lost.await(60000));
+ f2->release_caller();
+ EXPECT_TRUE(!f3.conn_deleted.await(20));
+ conn->SubRef();
+ EXPECT_TRUE(f3.conn_deleted.await(60000));
+ f1.shutdown();
+ }
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp
index 5b93c3fcd3f..ea8de81cff5 100644
--- a/fnet/src/tests/info/info.cpp
+++ b/fnet/src/tests/info/info.cpp
@@ -73,7 +73,7 @@ TEST("size of important objects")
EXPECT_EQUAL(192u, sizeof(FNET_IOComponent));
EXPECT_EQUAL(32u, sizeof(FNET_Channel));
EXPECT_EQUAL(40u, sizeof(FNET_PacketQueue_NoLock));
- EXPECT_EQUAL(480u, sizeof(FNET_Connection));
+ EXPECT_EQUAL(496u, sizeof(FNET_Connection));
EXPECT_EQUAL(96u, sizeof(FastOS_Cond));
EXPECT_EQUAL(56u, sizeof(FNET_DataBuffer));
EXPECT_EQUAL(24u, sizeof(FastOS_Time));
diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp
index f1ed051bdc1..180e7ea9adf 100644
--- a/fnet/src/vespa/fnet/connection.cpp
+++ b/fnet/src/vespa/fnet/connection.cpp
@@ -8,6 +8,7 @@
#include "iserveradapter.h"
#include "config.h"
#include "transport_thread.h"
+#include "transport.h"
#include <vespa/log/log.h>
LOG_SETUP(".fnet");
@@ -49,9 +50,29 @@ SyncPacket::Free()
_cond.Signal();
_cond.Unlock();
}
+}
+
+FNET_Connection::ResolveHandler::ResolveHandler(FNET_Connection *conn)
+ : connection(conn),
+ address()
+{
+ connection->AddRef();
+}
+
+void
+FNET_Connection::ResolveHandler::handle_result(vespalib::SocketAddress result)
+{
+ address = result;
+ connection->Owner()->Add(connection);
}
+FNET_Connection::ResolveHandler::~ResolveHandler()
+{
+ connection->SubRef();
+}
+
+
///////////////////////
// PROTECTED METHODS //
///////////////////////
@@ -394,6 +415,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner,
_serverAdapter(serverAdapter),
_adminChannel(nullptr),
_socket(std::move(socket)),
+ _resolve_handler(nullptr),
_context(),
_state(FNET_CONNECTED), // <-- NB
_flags(),
@@ -422,13 +444,13 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner,
FNET_IPacketHandler *adminHandler,
FNET_Context adminContext,
FNET_Context context,
- vespalib::SocketHandle socket,
const char *spec)
- : FNET_IOComponent(owner, socket.get(), spec, /* time-out = */ true),
+ : FNET_IOComponent(owner, -1, spec, /* time-out = */ true),
_streamer(streamer),
_serverAdapter(serverAdapter),
_adminChannel(nullptr),
- _socket(std::move(socket)),
+ _socket(),
+ _resolve_handler(nullptr),
_context(context),
_state(FNET_CONNECTING),
_flags(),
@@ -445,7 +467,6 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner,
_callbackTarget(nullptr),
_cleanup(nullptr)
{
- assert(_socket.valid());
if (adminHandler != nullptr) {
FNET_Channel::UP admin(new FNET_Channel(FNET_NOID, this, adminHandler, adminContext));
_adminChannel = admin.get();
@@ -484,7 +505,29 @@ FNET_Connection::Init()
}
// handle close by admin channel init
- return (_state <= FNET_CONNECTED);
+ if (_state == FNET_CLOSED) {
+ return false;
+ }
+
+ // initiate async resolve
+ if (IsClient()) {
+ _resolve_handler = std::make_shared<ResolveHandler>(this);
+ Owner()->owner().resolve_async(GetSpec(), _resolve_handler);
+ }
+ return true;
+}
+
+
+bool
+FNET_Connection::handle_add_event()
+{
+ if (_resolve_handler) {
+ auto tweak = [this](vespalib::SocketHandle &handle) { return Owner()->tune(handle); };
+ _socket = _resolve_handler->address.connect(tweak);
+ _ioc_socket_fd = _socket.get();
+ _resolve_handler.reset();
+ }
+ return _socket.valid();
}
@@ -653,6 +696,7 @@ FNET_Connection::CleanupHook()
void
FNET_Connection::Close()
{
+ _resolve_handler.reset();
detach_selector();
SetState(FNET_CLOSED);
_ioc_socket_fd = -1;
diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h
index 511d50a451e..699516554e2 100644
--- a/fnet/src/vespa/fnet/connection.h
+++ b/fnet/src/vespa/fnet/connection.h
@@ -8,6 +8,7 @@
#include "channellookup.h"
#include "packetqueue.h"
#include <vespa/vespalib/net/socket_handle.h>
+#include <vespa/vespalib/net/async_resolver.h>
class FNET_IPacketStreamer;
class FNET_IServerAdapter;
@@ -77,10 +78,19 @@ private:
bool _callbackWait;
bool _discarding;
};
+ struct ResolveHandler : public vespalib::AsyncResolver::ResultHandler {
+ FNET_Connection *connection;
+ vespalib::SocketAddress address;
+ ResolveHandler(FNET_Connection *conn);
+ void handle_result(vespalib::SocketAddress result) override;
+ ~ResolveHandler();
+ };
+ using ResolveHandlerSP = std::shared_ptr<ResolveHandler>;
FNET_IPacketStreamer *_streamer; // custom packet streamer
FNET_IServerAdapter *_serverAdapter; // only on server side
FNET_Channel *_adminChannel; // only on client side
vespalib::SocketHandle _socket; // socket for this conn
+ ResolveHandlerSP _resolve_handler; // async resolve callback
FNET_Context _context; // connection context
State _state; // connection state
Flags _flags; // Packed flags.
@@ -231,7 +241,6 @@ public:
* @param adminHandler packet handler for admin channel
* @param adminContext context for admin channel
* @param context initial context for this connection
- * @param socket the underlying socket used for IO
* @param spec connect spec
**/
FNET_Connection(FNET_TransportThread *owner,
@@ -240,7 +249,6 @@ public:
FNET_IPacketHandler *adminHandler,
FNET_Context adminContext,
FNET_Context context,
- vespalib::SocketHandle socket,
const char *spec);
/**
@@ -318,6 +326,17 @@ public:
bool Init();
/**
+ * Called by the transport thread as the initial part of adding
+ * this connection to the selection loop. If this is an incoming
+ * connection (already connected) this function does nothing. If
+ * this is an outgoing connection this function will use the async
+ * resolve result to create a socket and initiate async connect.
+ *
+ * @return false if connection broken, true otherwise.
+ **/
+ bool handle_add_event() override;
+
+ /**
* Register a cleanup handler to be invoked when this connection is
* about to be destructed.
*
diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp
index a3c3911db54..52c9cfd09b5 100644
--- a/fnet/src/vespa/fnet/iocomponent.cpp
+++ b/fnet/src/vespa/fnet/iocomponent.cpp
@@ -136,6 +136,13 @@ FNET_IOComponent::EnableWriteEvent(bool enabled)
}
+bool
+FNET_IOComponent::handle_add_event()
+{
+ return true;
+}
+
+
void
FNET_IOComponent::CleanupHook()
{
diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h
index 2e9bcb9fc0e..d0d8cee85b7 100644
--- a/fnet/src/vespa/fnet/iocomponent.h
+++ b/fnet/src/vespa/fnet/iocomponent.h
@@ -301,6 +301,17 @@ public:
//----------- virtual methods below ----------------------//
+ /**
+ * This function is called as the first step of adding an io
+ * component to the selection loop. The default implementation
+ * will always return true. This can be overridden to perform
+ * delayed setup in the network thread. If this function returns
+ * false, the component is broken and should be closed
+ * immediately.
+ *
+ * @return false if broken, true otherwise.
+ **/
+ virtual bool handle_add_event();
/**
* This method is called by the SubRef methods just before the
diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp
index 4177d80706d..675489fb229 100644
--- a/fnet/src/vespa/fnet/transport.cpp
+++ b/fnet/src/vespa/fnet/transport.cpp
@@ -22,8 +22,9 @@ struct HashState {
} // namespace <unnamed>
-FNET_Transport::FNET_Transport(size_t num_threads)
- : _threads()
+FNET_Transport::FNET_Transport(vespalib::AsyncResolver::SP resolver, size_t num_threads)
+ : _async_resolver(std::move(resolver)),
+ _threads()
{
assert(num_threads >= 1);
for (size_t i = 0; i < num_threads; ++i) {
@@ -31,7 +32,17 @@ FNET_Transport::FNET_Transport(size_t num_threads)
}
}
-FNET_Transport::~FNET_Transport() { }
+FNET_Transport::~FNET_Transport()
+{
+ _async_resolver->wait_for_pending_resolves();
+}
+
+void
+FNET_Transport::resolve_async(const vespalib::string &spec,
+ vespalib::AsyncResolver::ResultHandler::WP result_handler)
+{
+ _async_resolver->resolve_async(spec, std::move(result_handler));
+}
FNET_TransportThread *
FNET_Transport::select_thread(const void *key, size_t key_len) const
diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h
index 87dc54ddb86..8ad48d37cad 100644
--- a/fnet/src/vespa/fnet/transport.h
+++ b/fnet/src/vespa/fnet/transport.h
@@ -5,6 +5,7 @@
#include "context.h"
#include <memory>
#include <vector>
+#include <vespa/vespalib/net/async_resolver.h>
class FastOS_TimeInterface;
class FNET_TransportThread;
@@ -26,6 +27,7 @@ private:
using Thread = std::unique_ptr<FNET_TransportThread>;
using Threads = std::vector<Thread>;
+ vespalib::AsyncResolver::SP _async_resolver;
Threads _threads;
public:
@@ -36,10 +38,25 @@ public:
* the current thread become the transport thread. Main may only
* be called for single-threaded transports.
**/
- FNET_Transport(size_t num_threads = 1);
+ FNET_Transport(vespalib::AsyncResolver::SP resolver, size_t num_threads);
+ FNET_Transport(size_t num_threads = 1)
+ : FNET_Transport(vespalib::AsyncResolver::get_shared(), num_threads) {}
~FNET_Transport();
/**
+ * Resolve a connect spec into a socket address to be used to
+ * connect to a remote socket. This operation will be performed
+ * asynchronously and the result will be given to the result
+ * handler when ready. The result handler may be discarded to
+ * cancel the request.
+ *
+ * @param spec connect spec
+ * @param result handler
+ **/
+ void resolve_async(const vespalib::string &spec,
+ vespalib::AsyncResolver::ResultHandler::WP result_handler);
+
+ /**
* Select one of the underlying transport threads. The selection
* is based on hashing the given key as well as the current stack
* pointer.
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index ddeb6dfa93d..4a637b03532 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -276,16 +276,10 @@ FNET_TransportThread::Connect(const char *spec, FNET_IPacketStreamer *streamer,
FNET_IServerAdapter *serverAdapter,
FNET_Context connContext)
{
- auto tweak = [this](SocketHandle &handle) { return tune(handle); };
- SocketHandle handle = SocketSpec(spec).client_address().connect(tweak);
- if (handle.valid()) {
- std::unique_ptr<FNET_Connection> conn = std::make_unique<FNET_Connection>(this, streamer, serverAdapter,
- adminHandler, adminContext, connContext, std::move(handle), spec);
- if (conn->Init()) {
- conn->AddRef_NoLock();
- Add(conn.get(), /*needRef = */ false);
- return conn.release();
- }
+ std::unique_ptr<FNET_Connection> conn = std::make_unique<FNET_Connection>(this, streamer, serverAdapter,
+ adminHandler, adminContext, connContext, spec);
+ if (conn->Init()) {
+ return conn.release();
}
return nullptr;
}
@@ -460,9 +454,14 @@ FNET_TransportThread::handle_wakeup()
switch (packet->GetCommand()) {
case FNET_ControlPacket::FNET_CMD_IOC_ADD:
- AddComponent(context._value.IOC);
- context._value.IOC->_flags._ioc_added = true;
- context._value.IOC->attach_selector(_selector);
+ if (context._value.IOC->handle_add_event()) {
+ AddComponent(context._value.IOC);
+ context._value.IOC->_flags._ioc_added = true;
+ context._value.IOC->attach_selector(_selector);
+ } else {
+ context._value.IOC->Close();
+ AddDeleteComponent(context._value.IOC);
+ }
break;
case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_READ:
context._value.IOC->EnableReadEvent(true);