diff options
author | Håvard Pettersen <havardpe@oath.com> | 2022-04-04 12:27:34 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@oath.com> | 2022-04-04 12:27:34 +0000 |
commit | c7b37e2ef7322a9cc7caf387ec7c33d9a6da72db (patch) | |
tree | 584e6b75f5ba7638b61a02d752a9337439b1f615 | |
parent | de78c035b025b946b98039936ed134a9820b2ccd (diff) |
more robust supervisor detachment
-rw-r--r-- | fnet/src/vespa/fnet/controlpacket.cpp | 5 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/controlpacket.h | 6 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/frt/supervisor.cpp | 6 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport.cpp | 17 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport.h | 17 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.cpp | 33 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.h | 23 |
7 files changed, 67 insertions, 40 deletions
diff --git a/fnet/src/vespa/fnet/controlpacket.cpp b/fnet/src/vespa/fnet/controlpacket.cpp index 9ff69a76210..9aa03ad5e3a 100644 --- a/fnet/src/vespa/fnet/controlpacket.cpp +++ b/fnet/src/vespa/fnet/controlpacket.cpp @@ -95,7 +95,10 @@ FNET_ControlPacket FNET_ControlPacket::IOCClose(FNET_CMD_IOC_CLOSE); FNET_ControlPacket -FNET_ControlPacket::DetachServerAdapter(FNET_CMD_DETACH_SERVER_ADAPTER); +FNET_ControlPacket::DetachServerAdapterInit(FNET_CMD_DETACH_SERVER_ADAPTER_INIT); + +FNET_ControlPacket +FNET_ControlPacket::DetachServerAdapterFini(FNET_CMD_DETACH_SERVER_ADAPTER_FINI); FNET_ControlPacket FNET_ControlPacket::Execute(FNET_CMD_EXECUTE); diff --git a/fnet/src/vespa/fnet/controlpacket.h b/fnet/src/vespa/fnet/controlpacket.h index 8dd2d034ae6..ad846d37c30 100644 --- a/fnet/src/vespa/fnet/controlpacket.h +++ b/fnet/src/vespa/fnet/controlpacket.h @@ -38,7 +38,8 @@ public: FNET_CMD_IOC_ENABLE_WRITE, FNET_CMD_IOC_HANDSHAKE_ACT, FNET_CMD_IOC_CLOSE, - FNET_CMD_DETACH_SERVER_ADAPTER, + FNET_CMD_DETACH_SERVER_ADAPTER_INIT, + FNET_CMD_DETACH_SERVER_ADAPTER_FINI, FNET_CMD_EXECUTE, FNET_CMD_TIMEOUT, FNET_CMD_BAD_PACKET, @@ -51,7 +52,8 @@ public: static FNET_ControlPacket IOCEnableWrite; static FNET_ControlPacket IOCHandshakeACT; static FNET_ControlPacket IOCClose; - static FNET_ControlPacket DetachServerAdapter; + static FNET_ControlPacket DetachServerAdapterInit; + static FNET_ControlPacket DetachServerAdapterFini; static FNET_ControlPacket Execute; static FNET_ControlPacket Timeout; static FNET_ControlPacket BadPacket; diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp index 0a31b9d882b..1681321b239 100644 --- a/fnet/src/vespa/fnet/frt/supervisor.cpp +++ b/fnet/src/vespa/fnet/frt/supervisor.cpp @@ -30,12 +30,10 @@ FRT_Supervisor::FRT_Supervisor(FNET_Transport *transport) FRT_Supervisor::~FRT_Supervisor() { + _transport->detach(this); if (_connector != nullptr) { - _connector->Owner()->Close(_connector, /* needref */ false); + _connector->SubRef(); } - _transport->wait_for_pending_resolves(); - _transport->detach(this); - _transport->sync(); } FNET_Scheduler * diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp index 0a79f324bb9..09d4704ebbb 100644 --- a/fnet/src/vespa/fnet/transport.cpp +++ b/fnet/src/vespa/fnet/transport.cpp @@ -125,6 +125,11 @@ TransportConfig::time_tools() const { } // fnet +void +FNET_Transport::wait_for_pending_resolves() { + _async_resolver->wait_for_pending_resolves(); +} + FNET_Transport::FNET_Transport(const fnet::TransportConfig &cfg) : _async_resolver(cfg.resolver()), _crypto_engine(cfg.crypto()), @@ -158,11 +163,6 @@ FNET_Transport::resolve_async(const vespalib::string &spec, _async_resolver->resolve_async(spec, std::move(result_handler)); } -void -FNET_Transport::wait_for_pending_resolves() { - _async_resolver->wait_for_pending_resolves(); -} - vespalib::CryptoSocket::UP FNET_Transport::create_client_crypto_socket(vespalib::SocketHandle socket, const vespalib::SocketSpec &spec) { @@ -221,8 +221,13 @@ void FNET_Transport::detach(FNET_IServerAdapter *server_adapter) { for (const auto &thread: _threads) { - thread->detach(server_adapter); + thread->init_detach(server_adapter); + } + wait_for_pending_resolves(); + for (const auto &thread: _threads) { + thread->fini_detach(server_adapter); } + sync(); } FNET_Scheduler * diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h index 7dbfd80dfe7..d6e4aefb02b 100644 --- a/fnet/src/vespa/fnet/transport.h +++ b/fnet/src/vespa/fnet/transport.h @@ -113,6 +113,11 @@ private: Threads _threads; const FNET_Config _config; + /** + * Wait for all pending resolve requests. + **/ + void wait_for_pending_resolves(); + public: FNET_Transport(const FNET_Transport &) = delete; FNET_Transport & operator = (const FNET_Transport &) = delete; @@ -160,11 +165,6 @@ public: vespalib::AsyncResolver::ResultHandler::WP result_handler); /** - * Wait for all pending resolve requests. - **/ - void wait_for_pending_resolves(); - - /** * Wrap a plain socket endpoint (client side) in a CryptoSocket. The * implementation will be determined by the CryptoEngine used by * this Transport. @@ -258,11 +258,8 @@ public: * Detach a server adapter from this transport. * * This will close all connectors and connections referencing the - * server adapter. Note that this is an async - * operation. 'wait_for_pending_resolves' should be called before - * this to make sure any in-flight connections are added - * first. 'sync' should be called after this to drain any pending - * call-backs. + * server adapter. Note that this function will also synchronize + * with async address resolving and underlying transport threads. **/ void detach(FNET_IServerAdapter *server_adapter); diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index 53363740bae..d46d174c670 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -162,7 +162,7 @@ FNET_TransportThread::SafeDiscardEvent(FNET_ControlPacket *cpacket, void FNET_TransportThread::handle_add_cmd(FNET_IOComponent *ioc) { - if (ioc->handle_add_event()) { + if ((_detaching.count(ioc->server_adapter()) == 0) && ioc->handle_add_event()) { AddComponent(ioc); ioc->_flags._ioc_added = true; ioc->attach_selector(_selector); @@ -186,8 +186,9 @@ FNET_TransportThread::handle_close_cmd(FNET_IOComponent *ioc) void -FNET_TransportThread::handle_detach_server_adapter_cmd(FNET_IServerAdapter *server_adapter) +FNET_TransportThread::handle_detach_server_adapter_init_cmd(FNET_IServerAdapter *server_adapter) { + _detaching.insert(server_adapter); FNET_IOComponent *component = _componentsHead; while (component != nullptr) { FNET_IOComponent *tmp = component; @@ -201,6 +202,12 @@ FNET_TransportThread::handle_detach_server_adapter_cmd(FNET_IServerAdapter *serv } +void +FNET_TransportThread::handle_detach_server_adapter_fini_cmd(FNET_IServerAdapter *server_adapter) +{ + _detaching.erase(server_adapter); +} + extern "C" { static void pipehandler(int) @@ -241,7 +248,8 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in) _pseudo_thread(), _started(false), _shutdown(false), - _finished(false) + _finished(false), + _detaching() { trapsigpipe(); } @@ -348,9 +356,15 @@ FNET_TransportThread::Close(FNET_IOComponent *comp, bool needRef) } void -FNET_TransportThread::detach(FNET_IServerAdapter *server_adapter) +FNET_TransportThread::init_detach(FNET_IServerAdapter *server_adapter) { - PostEvent(&FNET_ControlPacket::DetachServerAdapter, FNET_Context(server_adapter)); + PostEvent(&FNET_ControlPacket::DetachServerAdapterInit, FNET_Context(server_adapter)); +} + +void +FNET_TransportThread::fini_detach(FNET_IServerAdapter *server_adapter) +{ + PostEvent(&FNET_ControlPacket::DetachServerAdapterFini, FNET_Context(server_adapter)); } bool @@ -432,8 +446,13 @@ FNET_TransportThread::handle_wakeup() continue; } - if (packet->GetCommand() == FNET_ControlPacket::FNET_CMD_DETACH_SERVER_ADAPTER) { - handle_detach_server_adapter_cmd(context._value.SERVER_ADAPTER); + if (packet->GetCommand() == FNET_ControlPacket::FNET_CMD_DETACH_SERVER_ADAPTER_INIT) { + handle_detach_server_adapter_init_cmd(context._value.SERVER_ADAPTER); + continue; + } + + if (packet->GetCommand() == FNET_ControlPacket::FNET_CMD_DETACH_SERVER_ADAPTER_FINI) { + handle_detach_server_adapter_fini_cmd(context._value.SERVER_ADAPTER); continue; } diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index c120894ac9c..b507c5dc31d 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -13,6 +13,7 @@ #include <mutex> #include <condition_variable> #include <chrono> +#include <set> namespace fnet { struct TimeTools; } class FNET_Transport; @@ -51,6 +52,7 @@ private: std::atomic<bool> _started; // event loop started ? std::atomic<bool> _shutdown; // should stop event loop ? std::atomic<bool> _finished; // event loop stopped ? + std::set<FNET_IServerAdapter*> _detaching; // server adapters being detached /** * Add an IOComponent to the list of components. This operation is @@ -143,7 +145,8 @@ private: void handle_add_cmd(FNET_IOComponent *ioc); void handle_close_cmd(FNET_IOComponent *ioc); - void handle_detach_server_adapter_cmd(FNET_IServerAdapter *server_adapter); + void handle_detach_server_adapter_init_cmd(FNET_IServerAdapter *server_adapter); + void handle_detach_server_adapter_fini_cmd(FNET_IServerAdapter *server_adapter); /** * This method is called to initialize the transport thread event @@ -336,16 +339,16 @@ public: void Close(FNET_IOComponent *comp, bool needRef = true); /** - * Detach a server adapter from this transport. - * - * This will close all connectors and connections referencing the - * server adapter. Note that this is an async - * operation. 'wait_for_pending_resolves' (on the owning - * Transport) should be called before this to make sure any - * in-flight connections are added first. 'sync' should be called - * after this to drain any pending call-backs. + * Start the operation of detaching a server adapter from this + * transport. + **/ + void init_detach(FNET_IServerAdapter *server_adapter); + + /** + * Complete the operation of detaching a server adapter from this + * transport. **/ - void detach(FNET_IServerAdapter *server_adapter); + void fini_detach(FNET_IServerAdapter *server_adapter); /** * Post an execution event on the transport event queue. The return |