diff options
author | Håvard Pettersen <havardpe@oath.com> | 2022-03-29 08:23:03 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@oath.com> | 2022-03-30 13:03:33 +0000 |
commit | 0d893528b601c6165bb010da0b11018e36a259fa (patch) | |
tree | 83f99e11ac8ee825b83baeec6202b9b52fcca596 /fnet | |
parent | 58200e0e03baf5fc712fd28313622bc8b4515dca (diff) |
enable detaching a supervisor from a running transport
This is done by closing all connectors and connections related to the
supervisor (via the server adapter interface). Also; the packet
streamer was made a singleton to avoid additional (unneeded)
references to the supervisor object.
Diffstat (limited to 'fnet')
-rw-r--r-- | fnet/CMakeLists.txt | 1 | ||||
-rw-r--r-- | fnet/src/tests/frt/detach_supervisor/CMakeLists.txt | 8 | ||||
-rw-r--r-- | fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp | 188 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connection.cpp | 5 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connection.h | 2 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connector.cpp | 7 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connector.h | 2 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/context.h | 18 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/controlpacket.cpp | 3 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/controlpacket.h | 2 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/frt/supervisor.cpp | 18 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/frt/supervisor.h | 4 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/iocomponent.h | 9 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport.cpp | 17 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport.h | 17 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.cpp | 26 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.h | 12 |
17 files changed, 323 insertions, 16 deletions
diff --git a/fnet/CMakeLists.txt b/fnet/CMakeLists.txt index 8b21777a3d5..eeef7f63876 100644 --- a/fnet/CMakeLists.txt +++ b/fnet/CMakeLists.txt @@ -17,6 +17,7 @@ vespa_define_module( src/tests/connection_spread src/tests/databuffer src/tests/examples + src/tests/frt/detach_supervisor src/tests/frt/method_pt src/tests/frt/parallel_rpc src/tests/frt/rpc diff --git a/fnet/src/tests/frt/detach_supervisor/CMakeLists.txt b/fnet/src/tests/frt/detach_supervisor/CMakeLists.txt new file mode 100644 index 00000000000..e434177d331 --- /dev/null +++ b/fnet/src/tests/frt/detach_supervisor/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(fnet_detach_supervisor_test_app TEST + SOURCES + detach_supervisor_test.cpp + DEPENDS + fnet +) +vespa_add_test(NAME fnet_detach_supervisor_test_app COMMAND fnet_detach_supervisor_test_app) diff --git a/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp b/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp new file mode 100644 index 00000000000..9f251d4d491 --- /dev/null +++ b/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp @@ -0,0 +1,188 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/fnet/transport.h> +#include <vespa/fnet/transport_thread.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/vespalib/net/crypto_engine.h> +#include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/util/time.h> +#include <vespa/fastos/thread.h> + +using namespace vespalib; +using vespalib::make_string_short::fmt; + +CryptoEngine::SP null_crypto = std::make_shared<NullCryptoEngine>(); + +struct BasicFixture { + FastOS_ThreadPool thread_pool; + FNET_Transport transport; + BasicFixture() : thread_pool(128_Ki), transport(fnet::TransportConfig(4).crypto(null_crypto)) { + ASSERT_TRUE(transport.Start(&thread_pool)); + } + ~BasicFixture() { + transport.ShutDown(true); + thread_pool.Close(); + } +}; + +struct RpcFixture : FRT_Invokable { + FRT_Supervisor orb; + std::atomic<FNET_Connection *> back_conn; + RpcFixture(BasicFixture &basic) : orb(&basic.transport), back_conn(nullptr) { + init_rpc(); + ASSERT_TRUE(orb.Listen(0)); + } + ~RpcFixture() { + if (back_conn.load() != nullptr) { + back_conn.load()->SubRef(); + } + } + uint32_t port() const { return orb.GetListenPort(); } + FRT_Target *connect(uint32_t port) { + return orb.GetTarget(port); + } + void init_rpc() { + FRT_ReflectionBuilder rb(&orb); + rb.DefineMethod("inc", "l", "l", FRT_METHOD(RpcFixture::rpc_inc), this); + rb.MethodDesc("increment a 64-bit integer"); + rb.ParamDesc("in", "an integer (64 bit)"); + rb.ReturnDesc("out", "in + 1 (64 bit)"); + rb.DefineMethod("connect", "", "", FRT_METHOD(RpcFixture::rpc_connect), this); + rb.MethodDesc("capture 2way connection"); + } + void rpc_inc(FRT_RPCRequest *req) { + FRT_Values ¶ms = *req->GetParams(); + FRT_Values &ret = *req->GetReturn(); + ret.AddInt64(params[0]._intval64 + 1); + } + void rpc_connect(FRT_RPCRequest *req) { + ASSERT_TRUE(back_conn.load() == nullptr); + back_conn.store(req->GetConnection()); + ASSERT_TRUE(back_conn.load() != nullptr); + back_conn.load()->AddRef(); + } + FRT_Target *meta_connect(uint32_t port) { + auto *target = orb.Get2WayTarget(fmt("tcp/localhost:%u", port).c_str()); + auto *req = orb.AllocRPCRequest(); + req->SetMethodName("connect"); + target->InvokeSync(req, 300.0); + ASSERT_TRUE(req->CheckReturnTypes("")); + req->SubRef(); + return target; + }; + int check_result(FRT_RPCRequest *req, uint64_t expect) { + int num_ok = 0; + if (!req->CheckReturnTypes("l")) { + ASSERT_EQUAL(req->GetErrorCode(), FRTE_RPC_CONNECTION); + } else { + uint64_t ret = req->GetReturn()->GetValue(0)._intval64; + ASSERT_EQUAL(ret, expect); + ++num_ok; + } + req->SubRef(); + return num_ok; + } + int verify_rpc(FNET_Connection *conn) { + auto *req = orb.AllocRPCRequest(); + req->SetMethodName("inc"); + req->GetParams()->AddInt64(7); + FRT_Supervisor::InvokeSync(conn->Owner()->GetScheduler(), conn, req, 300.0); + return check_result(req, 8); + } + int verify_rpc(FRT_Target *target) { + auto *req = orb.AllocRPCRequest(); + req->SetMethodName("inc"); + req->GetParams()->AddInt64(4); + target->InvokeSync(req, 300.0); + return check_result(req, 5); + } + int verify_rpc(FRT_Target *target, uint32_t port) { + auto *my_target = connect(port); + int num_ok = verify_rpc(target) + verify_rpc(my_target) + verify_rpc(back_conn.load()); + my_target->SubRef(); + return num_ok; + } +}; + +// test timeline: +// +// listen and export server ports +// --- #1 --- +// connect to target peer +// --- #2 --- +// verify that rpc works (persistent, transient, 2way) +// --- #3 --- +// detach supervisor while talking to it +// --- #4 --- +// verify that non-detached supervisor still works +// --- #5 --- +// test cleanup + +TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, BasicFixture(), uint32_t(), uint32_t(), uint32_t(), uint32_t()) { + if (thread_id == 0) { // server 1 (talks to client 1) + auto self = std::make_unique<RpcFixture>(f1); + f2 = self->port(); + TEST_BARRIER(); // #1 + auto *target = self->meta_connect(f4); + auto *client_target = self->connect(f3); + TEST_BARRIER(); // #2 + TEST_BARRIER(); // #3 + std::this_thread::sleep_for(50ms); + self.reset(); // <--- detach supervisor for server 1 + TEST_BARRIER(); // #4 + EXPECT_EQUAL(self->verify_rpc(target), 0); // outgoing 2way target should be closed + EXPECT_EQUAL(self->verify_rpc(client_target), 1); // pure client target should not be closed + TEST_BARRIER(); // #5 + target->SubRef(); + client_target->SubRef(); + } else if (thread_id == 1) { // server 2 (talks to client 2) + auto self = std::make_unique<RpcFixture>(f1); + f3 = self->port(); + TEST_BARRIER(); // #1 + auto *target = self->meta_connect(f5); + TEST_BARRIER(); // #2 + TEST_BARRIER(); // #3 + TEST_BARRIER(); // #4 + TEST_BARRIER(); // #5 + target->SubRef(); + } else if (thread_id == 2) { // client 1 (talks to server 1) + auto self = std::make_unique<RpcFixture>(f1); + f4 = self->port(); + TEST_BARRIER(); // #1 + auto *target = self->connect(f2); + TEST_BARRIER(); // #2 + ASSERT_TRUE(self->back_conn.load() != nullptr); + EXPECT_EQUAL(self->verify_rpc(target, f2), 3); + TEST_BARRIER(); // #3 + auto until = steady_clock::now() + 120s; + while ((self->verify_rpc(target, f2) > 0) && + (steady_clock::now() < until)) + { + // wait until peer is fully detached + } + TEST_BARRIER(); // #4 + EXPECT_EQUAL(self->verify_rpc(target, f2), 0); + TEST_BARRIER(); // #5 + target->SubRef(); + } else { // client 2 (talks to server 2) + ASSERT_EQUAL(thread_id, 3u); + auto self = std::make_unique<RpcFixture>(f1); + f5 = self->port(); + TEST_BARRIER(); // #1 + auto *target = self->connect(f3); + TEST_BARRIER(); // #2 + ASSERT_TRUE(self->back_conn.load() != nullptr); + EXPECT_EQUAL(self->verify_rpc(target, f3), 3); + TEST_BARRIER(); // #3 + TEST_BARRIER(); // #4 + EXPECT_EQUAL(self->verify_rpc(target, f3), 3); + TEST_BARRIER(); // #5 + target->SubRef(); + } +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp index c5849e28d93..2677445e35d 100644 --- a/fnet/src/vespa/fnet/connection.cpp +++ b/fnet/src/vespa/fnet/connection.cpp @@ -546,6 +546,11 @@ FNET_Connection::Init() return true; } +FNET_IServerAdapter * +FNET_Connection::server_adapter() +{ + return _serverAdapter; +} bool FNET_Connection::handle_add_event() diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h index 1a5e8be0198..15150ffbb07 100644 --- a/fnet/src/vespa/fnet/connection.h +++ b/fnet/src/vespa/fnet/connection.h @@ -348,6 +348,8 @@ public: **/ bool Init(); + FNET_IServerAdapter *server_adapter() override; + /** * Called by the transport thread as the initial part of adding * this connection to the selection loop. If this is an incoming diff --git a/fnet/src/vespa/fnet/connector.cpp b/fnet/src/vespa/fnet/connector.cpp index 8ef355eb1fa..1d2002d6861 100644 --- a/fnet/src/vespa/fnet/connector.cpp +++ b/fnet/src/vespa/fnet/connector.cpp @@ -31,6 +31,13 @@ FNET_Connector::GetPortNumber() const { } +FNET_IServerAdapter * +FNET_Connector::server_adapter() +{ + return _serverAdapter; +} + + void FNET_Connector::Close() { diff --git a/fnet/src/vespa/fnet/connector.h b/fnet/src/vespa/fnet/connector.h index 058c8e670db..ff93a0a6a92 100644 --- a/fnet/src/vespa/fnet/connector.h +++ b/fnet/src/vespa/fnet/connector.h @@ -45,6 +45,8 @@ public: **/ uint32_t GetPortNumber() const; + FNET_IServerAdapter *server_adapter() override; + /** * Close this connector. This method must be called in the transport * thread in order to avoid race conditions related to socket event diff --git a/fnet/src/vespa/fnet/context.h b/fnet/src/vespa/fnet/context.h index e5594b80ea8..e327ba3fe14 100644 --- a/fnet/src/vespa/fnet/context.h +++ b/fnet/src/vespa/fnet/context.h @@ -10,6 +10,7 @@ class FNET_IOComponent; class FNET_Connector; class FNET_Connection; class FNET_Channel; +class FNET_IServerAdapter; class FNET_IExecutable; /** * This class indicates the context of a packet. It is external to the @@ -30,17 +31,20 @@ public: { _value.CONNECTOR = value; } FNET_Context(FNET_Connection *value) : _value() { _value.CONNECTION = value; } + FNET_Context(FNET_IServerAdapter *value) : _value() + { _value.SERVER_ADAPTER = value; } FNET_Context(FNET_IExecutable *value) : _value() { _value.EXECUTABLE = value; } union { - uint32_t INT; - void *VOIDP; - FNET_Channel *CHANNEL; - FNET_IOComponent *IOC; - FNET_Connector *CONNECTOR; - FNET_Connection *CONNECTION; - FNET_IExecutable *EXECUTABLE; + uint32_t INT; + void *VOIDP; + FNET_Channel *CHANNEL; + FNET_IOComponent *IOC; + FNET_Connector *CONNECTOR; + FNET_Connection *CONNECTION; + FNET_IServerAdapter *SERVER_ADAPTER; + FNET_IExecutable *EXECUTABLE; } _value; void Print(uint32_t indent = 0); diff --git a/fnet/src/vespa/fnet/controlpacket.cpp b/fnet/src/vespa/fnet/controlpacket.cpp index 580bff7be69..9ff69a76210 100644 --- a/fnet/src/vespa/fnet/controlpacket.cpp +++ b/fnet/src/vespa/fnet/controlpacket.cpp @@ -95,6 +95,9 @@ FNET_ControlPacket FNET_ControlPacket::IOCClose(FNET_CMD_IOC_CLOSE); FNET_ControlPacket +FNET_ControlPacket::DetachServerAdapter(FNET_CMD_DETACH_SERVER_ADAPTER); + +FNET_ControlPacket FNET_ControlPacket::Execute(FNET_CMD_EXECUTE); FNET_ControlPacket diff --git a/fnet/src/vespa/fnet/controlpacket.h b/fnet/src/vespa/fnet/controlpacket.h index 3d0bb8cd630..8dd2d034ae6 100644 --- a/fnet/src/vespa/fnet/controlpacket.h +++ b/fnet/src/vespa/fnet/controlpacket.h @@ -38,6 +38,7 @@ public: FNET_CMD_IOC_ENABLE_WRITE, FNET_CMD_IOC_HANDSHAKE_ACT, FNET_CMD_IOC_CLOSE, + FNET_CMD_DETACH_SERVER_ADAPTER, FNET_CMD_EXECUTE, FNET_CMD_TIMEOUT, FNET_CMD_BAD_PACKET, @@ -50,6 +51,7 @@ public: static FNET_ControlPacket IOCEnableWrite; static FNET_ControlPacket IOCHandshakeACT; static FNET_ControlPacket IOCClose; + static FNET_ControlPacket DetachServerAdapter; static FNET_ControlPacket Execute; static FNET_ControlPacket Timeout; static FNET_ControlPacket BadPacket; diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp index 3a323390ad6..0a31b9d882b 100644 --- a/fnet/src/vespa/fnet/frt/supervisor.cpp +++ b/fnet/src/vespa/fnet/frt/supervisor.cpp @@ -10,10 +10,16 @@ #include <vespa/fastos/thread.h> #include <vespa/vespalib/util/require.h> +FNET_IPacketStreamer * +FRT_Supervisor::get_packet_streamer() { + static FRT_PacketFactory packet_factory; + static FNET_SimplePacketStreamer packet_streamer(&packet_factory); + return &packet_streamer; +} + + FRT_Supervisor::FRT_Supervisor(FNET_Transport *transport) : _transport(transport), - _packetFactory(), - _packetStreamer(&_packetFactory), _connector(nullptr), _reflectionManager(), _rpcHooks(&_reflectionManager) @@ -27,6 +33,8 @@ FRT_Supervisor::~FRT_Supervisor() if (_connector != nullptr) { _connector->Owner()->Close(_connector, /* needref */ false); } + _transport->wait_for_pending_resolves(); + _transport->detach(this); _transport->sync(); } @@ -38,7 +46,7 @@ FRT_Supervisor::Listen(const char *spec) { if (_connector != nullptr) return false; - _connector = _transport->Listen(spec, &_packetStreamer, this); + _connector = _transport->Listen(spec, get_packet_streamer(), this); return (_connector != nullptr); } @@ -64,7 +72,7 @@ FRT_Supervisor::GetTarget(const char *spec) { FNET_TransportThread *thread = _transport->select_thread(spec, strlen(spec)); return new FRT_Target(thread->GetScheduler(), - thread->Connect(spec, &_packetStreamer)); + thread->Connect(spec, get_packet_streamer())); } @@ -73,7 +81,7 @@ FRT_Supervisor::Get2WayTarget(const char *spec, FNET_Context connContext) { FNET_TransportThread *thread = _transport->select_thread(spec, strlen(spec)); return new FRT_Target(thread->GetScheduler(), - thread->Connect(spec, &_packetStreamer, + thread->Connect(spec, get_packet_streamer(), this, connContext)); } diff --git a/fnet/src/vespa/fnet/frt/supervisor.h b/fnet/src/vespa/fnet/frt/supervisor.h index 7756c738f09..610baca5d93 100644 --- a/fnet/src/vespa/fnet/frt/supervisor.h +++ b/fnet/src/vespa/fnet/frt/supervisor.h @@ -44,12 +44,12 @@ public: private: FNET_Transport *_transport; - FRT_PacketFactory _packetFactory; - FNET_SimplePacketStreamer _packetStreamer; FNET_Connector *_connector; FRT_ReflectionManager _reflectionManager; RPCHooks _rpcHooks; + static FNET_IPacketStreamer *get_packet_streamer(); + public: explicit FRT_Supervisor(FNET_Transport *transport); FRT_Supervisor(const FRT_Supervisor &) = delete; diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h index b4f061e5bc0..106e31c9236 100644 --- a/fnet/src/vespa/fnet/iocomponent.h +++ b/fnet/src/vespa/fnet/iocomponent.h @@ -8,6 +8,7 @@ #include <condition_variable> #include <chrono> +class FNET_IServerAdapter; class FNET_TransportThread; class FNET_Config; @@ -185,6 +186,14 @@ public: //----------- virtual methods below ----------------------// /** + * Used to identify which components are related to a specific + * server adapter to be able to perform partial shutdown. + * + * @return the server adapter attached to this component + **/ + virtual FNET_IServerAdapter *server_adapter() = 0; + + /** * This function is called as the first step of adding an io * component to the selection loop. The default implementation * will always return true. This can be overridden to perform diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp index 7f0061cfdf5..0a79f324bb9 100644 --- a/fnet/src/vespa/fnet/transport.cpp +++ b/fnet/src/vespa/fnet/transport.cpp @@ -158,6 +158,11 @@ FNET_Transport::resolve_async(const vespalib::string &spec, _async_resolver->resolve_async(spec, std::move(result_handler)); } +void +FNET_Transport::wait_for_pending_resolves() { + _async_resolver->wait_for_pending_resolves(); +} + vespalib::CryptoSocket::UP FNET_Transport::create_client_crypto_socket(vespalib::SocketHandle socket, const vespalib::SocketSpec &spec) { @@ -212,6 +217,14 @@ FNET_Transport::sync() } } +void +FNET_Transport::detach(FNET_IServerAdapter *server_adapter) +{ + for (const auto &thread: _threads) { + thread->detach(server_adapter); + } +} + FNET_Scheduler * FNET_Transport::GetScheduler() { @@ -231,7 +244,7 @@ FNET_Transport::ShutDown(bool waitFinished) thread->ShutDown(waitFinished); } if (waitFinished) { - _async_resolver->wait_for_pending_resolves(); + wait_for_pending_resolves(); _work_pool->shutdown().sync(); } } @@ -242,7 +255,7 @@ FNET_Transport::WaitFinished() for (const auto &thread: _threads) { thread->WaitFinished(); } - _async_resolver->wait_for_pending_resolves(); + wait_for_pending_resolves(); _work_pool->shutdown().sync(); } diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h index 3d21234ac07..7dbfd80dfe7 100644 --- a/fnet/src/vespa/fnet/transport.h +++ b/fnet/src/vespa/fnet/transport.h @@ -160,6 +160,11 @@ public: vespalib::AsyncResolver::ResultHandler::WP result_handler); /** + * Wait for all pending resolve requests. + **/ + void wait_for_pending_resolves(); + + /** * Wrap a plain socket endpoint (client side) in a CryptoSocket. The * implementation will be determined by the CryptoEngine used by * this Transport. @@ -250,6 +255,18 @@ public: void sync(); /** + * Detach a server adapter from this transport. + * + * This will close all connectors and connections referencing the + * server adapter. Note that this is an async + * operation. 'wait_for_pending_resolves' should be called before + * this to make sure any in-flight connections are added + * first. 'sync' should be called after this to drain any pending + * call-backs. + **/ + void detach(FNET_IServerAdapter *server_adapter); + + /** * Obtain a pointer to a transport thread scheduler. * * @return transport thread scheduler. diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index cbecf417274..53363740bae 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -185,6 +185,22 @@ FNET_TransportThread::handle_close_cmd(FNET_IOComponent *ioc) } +void +FNET_TransportThread::handle_detach_server_adapter_cmd(FNET_IServerAdapter *server_adapter) +{ + FNET_IOComponent *component = _componentsHead; + while (component != nullptr) { + FNET_IOComponent *tmp = component; + component = component->_ioc_next; + if (tmp->server_adapter() == server_adapter) { + RemoveComponent(tmp); + tmp->Close(); + AddDeleteComponent(tmp); + } + } +} + + extern "C" { static void pipehandler(int) @@ -331,6 +347,11 @@ FNET_TransportThread::Close(FNET_IOComponent *comp, bool needRef) PostEvent(&FNET_ControlPacket::IOCClose, FNET_Context(comp)); } +void +FNET_TransportThread::detach(FNET_IServerAdapter *server_adapter) +{ + PostEvent(&FNET_ControlPacket::DetachServerAdapter, FNET_Context(server_adapter)); +} bool FNET_TransportThread::execute(FNET_IExecutable *exe) @@ -411,6 +432,11 @@ FNET_TransportThread::handle_wakeup() continue; } + if (packet->GetCommand() == FNET_ControlPacket::FNET_CMD_DETACH_SERVER_ADAPTER) { + handle_detach_server_adapter_cmd(context._value.SERVER_ADAPTER); + continue; + } + if (context._value.IOC->_flags._ioc_delete) { context._value.IOC->SubRef(); continue; diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index 4a470fc1e29..c120894ac9c 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -143,6 +143,7 @@ private: void handle_add_cmd(FNET_IOComponent *ioc); void handle_close_cmd(FNET_IOComponent *ioc); + void handle_detach_server_adapter_cmd(FNET_IServerAdapter *server_adapter); /** * This method is called to initialize the transport thread event @@ -334,6 +335,17 @@ public: **/ void Close(FNET_IOComponent *comp, bool needRef = true); + /** + * Detach a server adapter from this transport. + * + * This will close all connectors and connections referencing the + * server adapter. Note that this is an async + * operation. 'wait_for_pending_resolves' (on the owning + * Transport) should be called before this to make sure any + * in-flight connections are added first. 'sync' should be called + * after this to drain any pending call-backs. + **/ + void detach(FNET_IServerAdapter *server_adapter); /** * Post an execution event on the transport event queue. The return |