diff options
author | Håvard Pettersen <havardpe@oath.com> | 2019-02-19 12:57:36 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@oath.com> | 2019-02-21 16:52:52 +0000 |
commit | 4b6a4cbd809b23749ddcf1a39fc0548c0882abe2 (patch) | |
tree | d95f327e94ad1cab03dbe9847d35ea603c3ac4a5 /fnet | |
parent | 69e4f91fa50744e4709410da488ae47fb04c3335 (diff) |
async tls handshake work
Diffstat (limited to 'fnet')
-rw-r--r-- | fnet/src/tests/connect/connect_test.cpp | 89 | ||||
-rw-r--r-- | fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp | 4 | ||||
-rw-r--r-- | fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp | 37 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connection.cpp | 40 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connection.h | 16 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/controlpacket.cpp | 3 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/controlpacket.h | 2 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/iocomponent.cpp | 5 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/iocomponent.h | 12 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport.cpp | 12 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport.h | 14 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.cpp | 17 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.h | 18 |
13 files changed, 246 insertions, 23 deletions
diff --git a/fnet/src/tests/connect/connect_test.cpp b/fnet/src/tests/connect/connect_test.cpp index 5e48390a297..b70b3fa8b01 100644 --- a/fnet/src/tests/connect/connect_test.cpp +++ b/fnet/src/tests/connect/connect_test.cpp @@ -3,11 +3,14 @@ #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/fnet/fnet.h> #include <vespa/vespalib/net/server_socket.h> +#include <vespa/vespalib/net/crypto_engine.h> #include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/stringfmt.h> using namespace vespalib; +int short_time = 20; // ms + struct BlockingHostResolver : public AsyncResolver::HostResolver { AsyncResolver::SimpleHostResolver resolver; Gate caller; @@ -33,6 +36,43 @@ AsyncResolver::SP make_resolver(AsyncResolver::HostResolver::SP host_resolver) { //----------------------------------------------------------------------------- +struct BlockingCryptoSocket : public CryptoSocket { + SocketHandle socket; + Gate &handshake_work_enter; + Gate &handshake_work_exit; + Gate &handshake_socket_deleted; + BlockingCryptoSocket(SocketHandle s, Gate &hs_work_enter, Gate &hs_work_exit, Gate &hs_socket_deleted) + : socket(std::move(s)), handshake_work_enter(hs_work_enter), handshake_work_exit(hs_work_exit), + handshake_socket_deleted(hs_socket_deleted) {} + ~BlockingCryptoSocket() override { + handshake_socket_deleted.countDown(); + } + int get_fd() const override { return socket.get(); } + HandshakeResult handshake() override { return HandshakeResult::NEED_WORK; } + void do_handshake_work() override { + handshake_work_enter.countDown(); + handshake_work_exit.await(); + } + size_t min_read_buffer_size() const override { return 1; } + ssize_t read(char *buf, size_t len) override { return socket.read(buf, len); } + ssize_t drain(char *, size_t) override { return 0; } + ssize_t write(const char *buf, size_t len) override { return socket.write(buf, len); } + ssize_t flush() override { return 0; } + ssize_t half_close() override { return socket.half_close(); } +}; + +struct BlockingCryptoEngine : public CryptoEngine { + Gate handshake_work_enter; + Gate handshake_work_exit; + Gate handshake_socket_deleted; + CryptoSocket::UP create_crypto_socket(SocketHandle socket, bool) override { + return std::make_unique<BlockingCryptoSocket>(std::move(socket), + handshake_work_enter, handshake_work_exit, handshake_socket_deleted); + } +}; + +//----------------------------------------------------------------------------- + struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler { FNET_SimplePacketStreamer streamer; FastOS_ThreadPool pool; @@ -50,6 +90,12 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler { { transport.Start(&pool); } + TransportFixture(CryptoEngine::SP crypto) + : streamer(nullptr), pool(128 * 1024), transport(crypto, 1), + conn_lost(), conn_deleted() + { + transport.Start(&pool); + } HP_RetCode HandlePacket(FNET_Packet *packet, FNET_Context) override { ASSERT_TRUE(packet->GetCommand() == FNET_ControlPacket::FNET_CMD_CHANNEL_LOST); conn_lost.countDown(); @@ -83,19 +129,19 @@ TEST_MT_FFF("require that normal connect works", 2, FNET_Connection *conn = f2.connect(spec); TEST_BARRIER(); conn->Owner()->Close(conn); - EXPECT_TRUE(f2.conn_lost.await(60000)); - EXPECT_TRUE(!f2.conn_deleted.await(20)); + f2.conn_lost.await(); + EXPECT_TRUE(!f2.conn_deleted.await(short_time)); conn->SubRef(); - EXPECT_TRUE(f2.conn_deleted.await(60000)); + f2.conn_deleted.await(); } } TEST_FF("require that bogus connect fail asynchronously", TransportFixture(), TimeBomb(60)) { FNET_Connection *conn = f1.connect("invalid"); - EXPECT_TRUE(f1.conn_lost.await(60000)); - EXPECT_TRUE(!f1.conn_deleted.await(20)); + f1.conn_lost.await(); + EXPECT_TRUE(!f1.conn_deleted.await(short_time)); conn->SubRef(); - EXPECT_TRUE(f1.conn_deleted.await(60000)); + f1.conn_deleted.await(); } TEST_MT_FFFF("require that async close can be called before async resolve completes", 2, @@ -110,13 +156,38 @@ TEST_MT_FFFF("require that async close can be called before async resolve comple FNET_Connection *conn = f3.connect(spec); f2->wait_for_caller(); conn->Owner()->Close(conn); - EXPECT_TRUE(f3.conn_lost.await(60000)); + f3.conn_lost.await(); f2->release_caller(); - EXPECT_TRUE(!f3.conn_deleted.await(20)); + EXPECT_TRUE(!f3.conn_deleted.await(short_time)); conn->SubRef(); - EXPECT_TRUE(f3.conn_deleted.await(60000)); + f3.conn_deleted.await(); f1.shutdown(); } } +TEST_MT_FFFF("require that async close during async do_handshake_work works", 2, + ServerSocket("tcp/0"), std::shared_ptr<BlockingCryptoEngine>(new BlockingCryptoEngine()), + TransportFixture(f2), TimeBomb(60)) +{ + if (thread_id == 0) { + SocketHandle socket = f1.accept(); + EXPECT_TRUE(socket.valid()); + TEST_BARRIER(); // #1 + } else { + vespalib::string spec = make_string("tcp/localhost:%d", f1.address().port()); + FNET_Connection *conn = f3.connect(spec); + f2->handshake_work_enter.await(); + conn->Owner()->Close(conn, false); + conn = nullptr; // ref given away + f3.conn_lost.await(); + TEST_BARRIER(); // #1 + // verify that pending work keeps relevant objects alive + EXPECT_TRUE(!f3.conn_deleted.await(short_time)); + EXPECT_TRUE(!f2->handshake_socket_deleted.await(short_time)); + f2->handshake_work_exit.countDown(); + f3.conn_deleted.await(); + f2->handshake_socket_deleted.await(); + } +} + TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp index 31aec84afd5..2441ea6eaa0 100644 --- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp +++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp @@ -89,13 +89,13 @@ void perform_test(size_t thread_id, Client &client, Result &result) { req = client.orb.AllocRPCRequest(req); req->SetMethodName("inc"); req->GetParams()->AddInt64(seq); - target->InvokeSync(req, 60.0); + target->InvokeSync(req, 300.0); ASSERT_TRUE(req->CheckReturnTypes("l")); uint64_t ret = req->GetReturn()->GetValue(0)._intval64; EXPECT_EQUAL(ret, seq + 1); seq = ret; }; - size_t loop_cnt = 128; + size_t loop_cnt = 8; BenchmarkTimer::benchmark(invoke, invoke, 0.5); BenchmarkTimer timer(1.5); while (timer.has_budget()) { diff --git a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp index b3e613de93f..0e5f4712e61 100644 --- a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp +++ b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp @@ -59,6 +59,22 @@ struct StartCmp { } }; +vespalib::string get_prefix(const std::vector<TimeTracer::Record> &stats, size_t idx) { + vespalib::string prefix; + TimeTracer::Record self = stats[idx]; + while (idx-- > 0) { + if (stats[idx].thread_id == self.thread_id) { + if (stats[idx].stop > self.start) { + prefix.append("..."); + } + } + } + if (!prefix.empty()) { + prefix.append(" "); + } + return prefix; +} + void benchmark_rpc(Fixture &fixture, bool reconnect) { uint64_t seq = 0; FRT_Target *target = fixture.connect(); @@ -95,24 +111,25 @@ void benchmark_rpc(Fixture &fixture, bool reconnect) { ASSERT_TRUE(stats.size() > 0); std::sort(stats.begin(), stats.end(), StartCmp()); fprintf(stderr, "===== time line BEGIN =====\n"); - for (const auto &entry: stats) { + for (size_t i = 0; i < stats.size(); ++i) { + const auto &entry = stats[i]; double abs_start = std::chrono::duration<double, std::milli>(entry.start - med_sample.start).count(); double abs_stop = std::chrono::duration<double, std::milli>(entry.stop - med_sample.start).count(); - fprintf(stderr, "[%g, %g] [%u:%s] %g ms\n", abs_start, abs_stop, entry.thread_id, entry.tag_name().c_str(), entry.ms_duration()); + fprintf(stderr, "%s[%g, %g] [%u:%s] %g ms\n", get_prefix(stats, i).c_str(), abs_start, abs_stop, + entry.thread_id, entry.tag_name().c_str(), entry.ms_duration()); } fprintf(stderr, "===== time line END =====\n"); - std::sort(stats.begin(), stats.end(), DurationCmp()); - ASSERT_TRUE(stats.back().tag_id == req_tag.id()); - double rest_ms = stats.back().ms_duration(); - while (!stats.empty() && stats.back().ms_duration() > 1.0) { - const auto &entry = stats.back(); + std::vector<TimeTracer::Record> high_duration; + for (const auto &entry: stats) { + if (entry.ms_duration() > 1.0) { + high_duration.push_back(entry); + } + } + for (const auto &entry: high_duration) { if (entry.tag_id != req_tag.id()) { fprintf(stderr, "WARNING: high duration: [%u:%s] %g ms\n", entry.thread_id, entry.tag_name().c_str(), entry.ms_duration()); - rest_ms -= entry.ms_duration(); } - stats.pop_back(); } - fprintf(stderr, "INFO: total non-critical overhead: %g ms\n", rest_ms); } TEST_F("^^^-- rpc with null encryption", Fixture(null_crypto)) { diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp index 1a5834d6e96..74a7d19387c 100644 --- a/fnet/src/vespa/fnet/connection.cpp +++ b/fnet/src/vespa/fnet/connection.cpp @@ -51,6 +51,26 @@ SyncPacket::Free() _cond.notify_one(); } } + + +struct DoHandshakeWork : vespalib::Executor::Task { + FNET_Connection *conn; + vespalib::CryptoSocket *socket; + DoHandshakeWork(FNET_Connection *conn_in, vespalib::CryptoSocket *socket_in) + : conn(conn_in), socket(socket_in) + { + conn->AddRef(); + } + void run() override { + socket->do_handshake_work(); + conn->Owner()->handshake_act(conn, false); + conn = nullptr; // ref given away above + } + ~DoHandshakeWork() { + assert(conn == nullptr); + } +}; + } @@ -216,6 +236,9 @@ bool FNET_Connection::handshake() { bool broken = false; + if (_flags._handshake_work_pending) { + return !broken; + } switch (_socket->handshake()) { case vespalib::CryptoSocket::HandshakeResult::FAIL: LOG(debug, "Connection(%s): handshake failed with peer %s", GetSpec(), GetPeerSpec().c_str()); @@ -247,6 +270,12 @@ FNET_Connection::handshake() EnableReadEvent(false); EnableWriteEvent(true); break; + case vespalib::CryptoSocket::HandshakeResult::NEED_WORK: + EnableReadEvent(false); + EnableWriteEvent(false); + assert(!_flags._handshake_work_pending); + _flags._handshake_work_pending = true; + Owner()->owner().post_or_perform(std::make_unique<DoHandshakeWork>(this, _socket.get())); } return !broken; } @@ -552,6 +581,13 @@ FNET_Connection::handle_add_event() return (_socket && (_socket->get_fd() >= 0)); } +bool +FNET_Connection::handle_handshake_act() +{ + assert(_flags._handshake_work_pending); + _flags._handshake_work_pending = false; + return ((_state == FNET_CONNECTING) && handshake()); +} void FNET_Connection::SetCleanupHandler(FNET_IConnectionCleanupHandler *handler) @@ -695,7 +731,9 @@ FNET_Connection::Close() detach_selector(); SetState(FNET_CLOSED); _ioc_socket_fd = -1; - _socket.reset(); + if (!_flags._handshake_work_pending) { + _socket.reset(); + } } diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h index 40f826243bf..b52d0147ce1 100644 --- a/fnet/src/vespa/fnet/connection.h +++ b/fnet/src/vespa/fnet/connection.h @@ -71,13 +71,15 @@ private: _inCallback(false), _callbackWait(false), _discarding(false), - _framed(false) + _framed(false), + _handshake_work_pending(false) { } bool _gotheader; bool _inCallback; bool _callbackWait; bool _discarding; bool _framed; + bool _handshake_work_pending; }; struct ResolveHandler : public vespalib::AsyncResolver::ResultHandler { FNET_Connection *connection; @@ -357,6 +359,18 @@ public: bool handle_add_event() override; /** + * This function is called by the transport thread to handle the + * completion of an asynchronous invocation of + * 'do_handshake_work'. This function will try to progress + * connection handshaking further, based on the work performed by + * 'do_handshake_work'. If this function returns false, the + * component is broken and should be closed immediately. + * + * @return false if broken, true otherwise. + **/ + bool handle_handshake_act() override; + + /** * Register a cleanup handler to be invoked when this connection is * about to be destructed. * diff --git a/fnet/src/vespa/fnet/controlpacket.cpp b/fnet/src/vespa/fnet/controlpacket.cpp index f4d10e48353..e2ca5fdcf8b 100644 --- a/fnet/src/vespa/fnet/controlpacket.cpp +++ b/fnet/src/vespa/fnet/controlpacket.cpp @@ -101,6 +101,9 @@ FNET_ControlPacket FNET_ControlPacket::IOCDisableWrite(FNET_CMD_IOC_DISABLE_WRITE); FNET_ControlPacket +FNET_ControlPacket::IOCHandshakeACT(FNET_CMD_IOC_HANDSHAKE_ACT); + +FNET_ControlPacket FNET_ControlPacket::IOCClose(FNET_CMD_IOC_CLOSE); FNET_ControlPacket diff --git a/fnet/src/vespa/fnet/controlpacket.h b/fnet/src/vespa/fnet/controlpacket.h index 5e6eb8a564b..c69cb8e086b 100644 --- a/fnet/src/vespa/fnet/controlpacket.h +++ b/fnet/src/vespa/fnet/controlpacket.h @@ -39,6 +39,7 @@ public: FNET_CMD_IOC_DISABLE_READ, FNET_CMD_IOC_ENABLE_WRITE, FNET_CMD_IOC_DISABLE_WRITE, + FNET_CMD_IOC_HANDSHAKE_ACT, FNET_CMD_IOC_CLOSE, FNET_CMD_EXECUTE, FNET_CMD_TIMEOUT, @@ -54,6 +55,7 @@ public: static FNET_ControlPacket IOCDisableRead; static FNET_ControlPacket IOCEnableWrite; static FNET_ControlPacket IOCDisableWrite; + static FNET_ControlPacket IOCHandshakeACT; static FNET_ControlPacket IOCClose; static FNET_ControlPacket Execute; static FNET_ControlPacket Timeout; diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp index d4244cbf204..0ecf63ea642 100644 --- a/fnet/src/vespa/fnet/iocomponent.cpp +++ b/fnet/src/vespa/fnet/iocomponent.cpp @@ -139,6 +139,11 @@ FNET_IOComponent::handle_add_event() return true; } +bool +FNET_IOComponent::handle_handshake_act() +{ + return true; +} void FNET_IOComponent::CleanupHook() diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h index 901c3d1a5d0..1a50ccbca73 100644 --- a/fnet/src/vespa/fnet/iocomponent.h +++ b/fnet/src/vespa/fnet/iocomponent.h @@ -197,6 +197,18 @@ public: virtual bool handle_add_event(); /** + * This function is called by the transport thread to handle the + * completion of an asynchronous invocation of + * 'do_handshake_work'. This functionality is used by TLS + * connections in order to move expensive cpu work out of the + * transport thread. If this function returns false, the component + * is broken and should be closed immediately. + * + * @return false if broken, true otherwise. + **/ + virtual bool handle_handshake_act(); + + /** * This method is called by the SubRef methods just before the * object is deleted. It may be used to perform cleanup tasks that * must be done before the destructor is invoked. diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp index fdc3576db2f..4db2a33c78c 100644 --- a/fnet/src/vespa/fnet/transport.cpp +++ b/fnet/src/vespa/fnet/transport.cpp @@ -20,11 +20,14 @@ struct HashState { key_hash(XXH64(key, key_len, 0)) {} }; +VESPA_THREAD_STACK_TAG(fnet_work_pool); + } // namespace <unnamed> FNET_Transport::FNET_Transport(vespalib::AsyncResolver::SP resolver, vespalib::CryptoEngine::SP crypto, size_t num_threads) : _async_resolver(std::move(resolver)), _crypto_engine(std::move(crypto)), + _work_pool(1, 128 * 1024, fnet_work_pool, 1024), _threads() { assert(num_threads >= 1); @@ -36,6 +39,15 @@ FNET_Transport::FNET_Transport(vespalib::AsyncResolver::SP resolver, vespalib::C FNET_Transport::~FNET_Transport() { _async_resolver->wait_for_pending_resolves(); + _work_pool.shutdown().sync(); +} + +void +FNET_Transport::post_or_perform(vespalib::Executor::Task::UP task) +{ + if (auto rejected = _work_pool.execute(std::move(task))) { + rejected->run(); + } } void diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h index 4624c183b01..27264aa1965 100644 --- a/fnet/src/vespa/fnet/transport.h +++ b/fnet/src/vespa/fnet/transport.h @@ -7,6 +7,7 @@ #include <vector> #include <vespa/vespalib/net/async_resolver.h> #include <vespa/vespalib/net/crypto_engine.h> +#include <vespa/vespalib/util/threadstackexecutor.h> class FastOS_TimeInterface; class FNET_TransportThread; @@ -30,6 +31,7 @@ private: vespalib::AsyncResolver::SP _async_resolver; vespalib::CryptoEngine::SP _crypto_engine; + vespalib::ThreadStackExecutor _work_pool; Threads _threads; public: @@ -53,6 +55,18 @@ public: ~FNET_Transport(); /** + * Try to execute the given task on the internal work pool + * executor (post). If the executor has been closed or there is + * too much pending work the task is run in the context of the + * current thread instead (perform). The idea is to move work away + * from the transport threads as long as pending work is not + * piling up. + * + * @param task work to be done + **/ + void post_or_perform(vespalib::Executor::Task::UP task); + + /** * Resolve a connect spec into a socket address to be used to * connect to a remote socket. This operation will be performed * asynchronously and the result will be given to the result diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index 8ee33ad7960..a5cf15c3939 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -144,6 +144,7 @@ FNET_TransportThread::DiscardEvent(FNET_ControlPacket *cpacket, case FNET_ControlPacket::FNET_CMD_IOC_DISABLE_READ: case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_WRITE: case FNET_ControlPacket::FNET_CMD_IOC_DISABLE_WRITE: + case FNET_ControlPacket::FNET_CMD_IOC_HANDSHAKE_ACT: case FNET_ControlPacket::FNET_CMD_IOC_CLOSE: context._value.IOC->SubRef(); break; @@ -344,6 +345,15 @@ FNET_TransportThread::DisableWrite(FNET_IOComponent *comp, bool needRef) FNET_Context(comp)); } +void +FNET_TransportThread::handshake_act(FNET_IOComponent *comp, bool needRef) +{ + if (needRef) { + comp->AddRef(); + } + PostEvent(&FNET_ControlPacket::IOCHandshakeACT, + FNET_Context(comp)); +} void FNET_TransportThread::Close(FNET_IOComponent *comp, bool needRef) @@ -481,6 +491,13 @@ FNET_TransportThread::handle_wakeup() context._value.IOC->EnableWriteEvent(false); context._value.IOC->SubRef(); break; + case FNET_ControlPacket::FNET_CMD_IOC_HANDSHAKE_ACT: + if (context._value.IOC->handle_handshake_act()) { + context._value.IOC->SubRef(); + } else { + handle_close_cmd(context._value.IOC); + } + break; case FNET_ControlPacket::FNET_CMD_IOC_CLOSE: handle_close_cmd(context._value.IOC); break; diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index ffbbb7acc0f..dd135502afe 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -414,6 +414,24 @@ public: /** + * Signal the completion of an asyncronous handshake operation for + * the given io component. Note that the actual work is performed + * by the transport thread. This method simply posts an event on + * the transport thread event queue. NOTE: in order to post async + * events regarding I/O components, an extra reference to the + * component needs to be allocated. The needRef flag indicates + * wether the caller already has done this. + * + * @param comp the component to signal about operation completion + * @param needRef should be set to false if the caller of this + * method already has obtained an extra reference to the + * component. If this flag is true, this method will call the + * AddRef method on the component. + **/ + void handshake_act(FNET_IOComponent *comp, bool needRef = true); + + + /** * Close an I/O component and remove it from the working set of this * transport object. Note that the actual work is performed by the * transport thread. This method simply posts an event on the |