summaryrefslogtreecommitdiffstats
path: root/fnet
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@oath.com>2022-03-29 08:23:03 +0000
committerHåvard Pettersen <havardpe@oath.com>2022-03-30 13:03:33 +0000
commit0d893528b601c6165bb010da0b11018e36a259fa (patch)
tree83f99e11ac8ee825b83baeec6202b9b52fcca596 /fnet
parent58200e0e03baf5fc712fd28313622bc8b4515dca (diff)
enable detaching a supervisor from a running transport
This is done by closing all connectors and connections related to the supervisor (via the server adapter interface). Also; the packet streamer was made a singleton to avoid additional (unneeded) references to the supervisor object.
Diffstat (limited to 'fnet')
-rw-r--r--fnet/CMakeLists.txt1
-rw-r--r--fnet/src/tests/frt/detach_supervisor/CMakeLists.txt8
-rw-r--r--fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp188
-rw-r--r--fnet/src/vespa/fnet/connection.cpp5
-rw-r--r--fnet/src/vespa/fnet/connection.h2
-rw-r--r--fnet/src/vespa/fnet/connector.cpp7
-rw-r--r--fnet/src/vespa/fnet/connector.h2
-rw-r--r--fnet/src/vespa/fnet/context.h18
-rw-r--r--fnet/src/vespa/fnet/controlpacket.cpp3
-rw-r--r--fnet/src/vespa/fnet/controlpacket.h2
-rw-r--r--fnet/src/vespa/fnet/frt/supervisor.cpp18
-rw-r--r--fnet/src/vespa/fnet/frt/supervisor.h4
-rw-r--r--fnet/src/vespa/fnet/iocomponent.h9
-rw-r--r--fnet/src/vespa/fnet/transport.cpp17
-rw-r--r--fnet/src/vespa/fnet/transport.h17
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp26
-rw-r--r--fnet/src/vespa/fnet/transport_thread.h12
17 files changed, 323 insertions, 16 deletions
diff --git a/fnet/CMakeLists.txt b/fnet/CMakeLists.txt
index 8b21777a3d5..eeef7f63876 100644
--- a/fnet/CMakeLists.txt
+++ b/fnet/CMakeLists.txt
@@ -17,6 +17,7 @@ vespa_define_module(
src/tests/connection_spread
src/tests/databuffer
src/tests/examples
+ src/tests/frt/detach_supervisor
src/tests/frt/method_pt
src/tests/frt/parallel_rpc
src/tests/frt/rpc
diff --git a/fnet/src/tests/frt/detach_supervisor/CMakeLists.txt b/fnet/src/tests/frt/detach_supervisor/CMakeLists.txt
new file mode 100644
index 00000000000..e434177d331
--- /dev/null
+++ b/fnet/src/tests/frt/detach_supervisor/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(fnet_detach_supervisor_test_app TEST
+ SOURCES
+ detach_supervisor_test.cpp
+ DEPENDS
+ fnet
+)
+vespa_add_test(NAME fnet_detach_supervisor_test_app COMMAND fnet_detach_supervisor_test_app)
diff --git a/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp b/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp
new file mode 100644
index 00000000000..9f251d4d491
--- /dev/null
+++ b/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp
@@ -0,0 +1,188 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/fnet/transport_thread.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/vespalib/net/crypto_engine.h>
+#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/util/time.h>
+#include <vespa/fastos/thread.h>
+
+using namespace vespalib;
+using vespalib::make_string_short::fmt;
+
+CryptoEngine::SP null_crypto = std::make_shared<NullCryptoEngine>();
+
+struct BasicFixture {
+ FastOS_ThreadPool thread_pool;
+ FNET_Transport transport;
+ BasicFixture() : thread_pool(128_Ki), transport(fnet::TransportConfig(4).crypto(null_crypto)) {
+ ASSERT_TRUE(transport.Start(&thread_pool));
+ }
+ ~BasicFixture() {
+ transport.ShutDown(true);
+ thread_pool.Close();
+ }
+};
+
+struct RpcFixture : FRT_Invokable {
+ FRT_Supervisor orb;
+ std::atomic<FNET_Connection *> back_conn;
+ RpcFixture(BasicFixture &basic) : orb(&basic.transport), back_conn(nullptr) {
+ init_rpc();
+ ASSERT_TRUE(orb.Listen(0));
+ }
+ ~RpcFixture() {
+ if (back_conn.load() != nullptr) {
+ back_conn.load()->SubRef();
+ }
+ }
+ uint32_t port() const { return orb.GetListenPort(); }
+ FRT_Target *connect(uint32_t port) {
+ return orb.GetTarget(port);
+ }
+ void init_rpc() {
+ FRT_ReflectionBuilder rb(&orb);
+ rb.DefineMethod("inc", "l", "l", FRT_METHOD(RpcFixture::rpc_inc), this);
+ rb.MethodDesc("increment a 64-bit integer");
+ rb.ParamDesc("in", "an integer (64 bit)");
+ rb.ReturnDesc("out", "in + 1 (64 bit)");
+ rb.DefineMethod("connect", "", "", FRT_METHOD(RpcFixture::rpc_connect), this);
+ rb.MethodDesc("capture 2way connection");
+ }
+ void rpc_inc(FRT_RPCRequest *req) {
+ FRT_Values &params = *req->GetParams();
+ FRT_Values &ret = *req->GetReturn();
+ ret.AddInt64(params[0]._intval64 + 1);
+ }
+ void rpc_connect(FRT_RPCRequest *req) {
+ ASSERT_TRUE(back_conn.load() == nullptr);
+ back_conn.store(req->GetConnection());
+ ASSERT_TRUE(back_conn.load() != nullptr);
+ back_conn.load()->AddRef();
+ }
+ FRT_Target *meta_connect(uint32_t port) {
+ auto *target = orb.Get2WayTarget(fmt("tcp/localhost:%u", port).c_str());
+ auto *req = orb.AllocRPCRequest();
+ req->SetMethodName("connect");
+ target->InvokeSync(req, 300.0);
+ ASSERT_TRUE(req->CheckReturnTypes(""));
+ req->SubRef();
+ return target;
+ };
+ int check_result(FRT_RPCRequest *req, uint64_t expect) {
+ int num_ok = 0;
+ if (!req->CheckReturnTypes("l")) {
+ ASSERT_EQUAL(req->GetErrorCode(), FRTE_RPC_CONNECTION);
+ } else {
+ uint64_t ret = req->GetReturn()->GetValue(0)._intval64;
+ ASSERT_EQUAL(ret, expect);
+ ++num_ok;
+ }
+ req->SubRef();
+ return num_ok;
+ }
+ int verify_rpc(FNET_Connection *conn) {
+ auto *req = orb.AllocRPCRequest();
+ req->SetMethodName("inc");
+ req->GetParams()->AddInt64(7);
+ FRT_Supervisor::InvokeSync(conn->Owner()->GetScheduler(), conn, req, 300.0);
+ return check_result(req, 8);
+ }
+ int verify_rpc(FRT_Target *target) {
+ auto *req = orb.AllocRPCRequest();
+ req->SetMethodName("inc");
+ req->GetParams()->AddInt64(4);
+ target->InvokeSync(req, 300.0);
+ return check_result(req, 5);
+ }
+ int verify_rpc(FRT_Target *target, uint32_t port) {
+ auto *my_target = connect(port);
+ int num_ok = verify_rpc(target) + verify_rpc(my_target) + verify_rpc(back_conn.load());
+ my_target->SubRef();
+ return num_ok;
+ }
+};
+
+// test timeline:
+//
+// listen and export server ports
+// --- #1 ---
+// connect to target peer
+// --- #2 ---
+// verify that rpc works (persistent, transient, 2way)
+// --- #3 ---
+// detach supervisor while talking to it
+// --- #4 ---
+// verify that non-detached supervisor still works
+// --- #5 ---
+// test cleanup
+
+TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, BasicFixture(), uint32_t(), uint32_t(), uint32_t(), uint32_t()) {
+ if (thread_id == 0) { // server 1 (talks to client 1)
+ auto self = std::make_unique<RpcFixture>(f1);
+ f2 = self->port();
+ TEST_BARRIER(); // #1
+ auto *target = self->meta_connect(f4);
+ auto *client_target = self->connect(f3);
+ TEST_BARRIER(); // #2
+ TEST_BARRIER(); // #3
+ std::this_thread::sleep_for(50ms);
+ self.reset(); // <--- detach supervisor for server 1
+ TEST_BARRIER(); // #4
+ EXPECT_EQUAL(self->verify_rpc(target), 0); // outgoing 2way target should be closed
+ EXPECT_EQUAL(self->verify_rpc(client_target), 1); // pure client target should not be closed
+ TEST_BARRIER(); // #5
+ target->SubRef();
+ client_target->SubRef();
+ } else if (thread_id == 1) { // server 2 (talks to client 2)
+ auto self = std::make_unique<RpcFixture>(f1);
+ f3 = self->port();
+ TEST_BARRIER(); // #1
+ auto *target = self->meta_connect(f5);
+ TEST_BARRIER(); // #2
+ TEST_BARRIER(); // #3
+ TEST_BARRIER(); // #4
+ TEST_BARRIER(); // #5
+ target->SubRef();
+ } else if (thread_id == 2) { // client 1 (talks to server 1)
+ auto self = std::make_unique<RpcFixture>(f1);
+ f4 = self->port();
+ TEST_BARRIER(); // #1
+ auto *target = self->connect(f2);
+ TEST_BARRIER(); // #2
+ ASSERT_TRUE(self->back_conn.load() != nullptr);
+ EXPECT_EQUAL(self->verify_rpc(target, f2), 3);
+ TEST_BARRIER(); // #3
+ auto until = steady_clock::now() + 120s;
+ while ((self->verify_rpc(target, f2) > 0) &&
+ (steady_clock::now() < until))
+ {
+ // wait until peer is fully detached
+ }
+ TEST_BARRIER(); // #4
+ EXPECT_EQUAL(self->verify_rpc(target, f2), 0);
+ TEST_BARRIER(); // #5
+ target->SubRef();
+ } else { // client 2 (talks to server 2)
+ ASSERT_EQUAL(thread_id, 3u);
+ auto self = std::make_unique<RpcFixture>(f1);
+ f5 = self->port();
+ TEST_BARRIER(); // #1
+ auto *target = self->connect(f3);
+ TEST_BARRIER(); // #2
+ ASSERT_TRUE(self->back_conn.load() != nullptr);
+ EXPECT_EQUAL(self->verify_rpc(target, f3), 3);
+ TEST_BARRIER(); // #3
+ TEST_BARRIER(); // #4
+ EXPECT_EQUAL(self->verify_rpc(target, f3), 3);
+ TEST_BARRIER(); // #5
+ target->SubRef();
+ }
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp
index c5849e28d93..2677445e35d 100644
--- a/fnet/src/vespa/fnet/connection.cpp
+++ b/fnet/src/vespa/fnet/connection.cpp
@@ -546,6 +546,11 @@ FNET_Connection::Init()
return true;
}
+FNET_IServerAdapter *
+FNET_Connection::server_adapter()
+{
+ return _serverAdapter;
+}
bool
FNET_Connection::handle_add_event()
diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h
index 1a5e8be0198..15150ffbb07 100644
--- a/fnet/src/vespa/fnet/connection.h
+++ b/fnet/src/vespa/fnet/connection.h
@@ -348,6 +348,8 @@ public:
**/
bool Init();
+ FNET_IServerAdapter *server_adapter() override;
+
/**
* Called by the transport thread as the initial part of adding
* this connection to the selection loop. If this is an incoming
diff --git a/fnet/src/vespa/fnet/connector.cpp b/fnet/src/vespa/fnet/connector.cpp
index 8ef355eb1fa..1d2002d6861 100644
--- a/fnet/src/vespa/fnet/connector.cpp
+++ b/fnet/src/vespa/fnet/connector.cpp
@@ -31,6 +31,13 @@ FNET_Connector::GetPortNumber() const {
}
+FNET_IServerAdapter *
+FNET_Connector::server_adapter()
+{
+ return _serverAdapter;
+}
+
+
void
FNET_Connector::Close()
{
diff --git a/fnet/src/vespa/fnet/connector.h b/fnet/src/vespa/fnet/connector.h
index 058c8e670db..ff93a0a6a92 100644
--- a/fnet/src/vespa/fnet/connector.h
+++ b/fnet/src/vespa/fnet/connector.h
@@ -45,6 +45,8 @@ public:
**/
uint32_t GetPortNumber() const;
+ FNET_IServerAdapter *server_adapter() override;
+
/**
* Close this connector. This method must be called in the transport
* thread in order to avoid race conditions related to socket event
diff --git a/fnet/src/vespa/fnet/context.h b/fnet/src/vespa/fnet/context.h
index e5594b80ea8..e327ba3fe14 100644
--- a/fnet/src/vespa/fnet/context.h
+++ b/fnet/src/vespa/fnet/context.h
@@ -10,6 +10,7 @@ class FNET_IOComponent;
class FNET_Connector;
class FNET_Connection;
class FNET_Channel;
+class FNET_IServerAdapter;
class FNET_IExecutable;
/**
* This class indicates the context of a packet. It is external to the
@@ -30,17 +31,20 @@ public:
{ _value.CONNECTOR = value; }
FNET_Context(FNET_Connection *value) : _value()
{ _value.CONNECTION = value; }
+ FNET_Context(FNET_IServerAdapter *value) : _value()
+ { _value.SERVER_ADAPTER = value; }
FNET_Context(FNET_IExecutable *value) : _value()
{ _value.EXECUTABLE = value; }
union {
- uint32_t INT;
- void *VOIDP;
- FNET_Channel *CHANNEL;
- FNET_IOComponent *IOC;
- FNET_Connector *CONNECTOR;
- FNET_Connection *CONNECTION;
- FNET_IExecutable *EXECUTABLE;
+ uint32_t INT;
+ void *VOIDP;
+ FNET_Channel *CHANNEL;
+ FNET_IOComponent *IOC;
+ FNET_Connector *CONNECTOR;
+ FNET_Connection *CONNECTION;
+ FNET_IServerAdapter *SERVER_ADAPTER;
+ FNET_IExecutable *EXECUTABLE;
} _value;
void Print(uint32_t indent = 0);
diff --git a/fnet/src/vespa/fnet/controlpacket.cpp b/fnet/src/vespa/fnet/controlpacket.cpp
index 580bff7be69..9ff69a76210 100644
--- a/fnet/src/vespa/fnet/controlpacket.cpp
+++ b/fnet/src/vespa/fnet/controlpacket.cpp
@@ -95,6 +95,9 @@ FNET_ControlPacket
FNET_ControlPacket::IOCClose(FNET_CMD_IOC_CLOSE);
FNET_ControlPacket
+FNET_ControlPacket::DetachServerAdapter(FNET_CMD_DETACH_SERVER_ADAPTER);
+
+FNET_ControlPacket
FNET_ControlPacket::Execute(FNET_CMD_EXECUTE);
FNET_ControlPacket
diff --git a/fnet/src/vespa/fnet/controlpacket.h b/fnet/src/vespa/fnet/controlpacket.h
index 3d0bb8cd630..8dd2d034ae6 100644
--- a/fnet/src/vespa/fnet/controlpacket.h
+++ b/fnet/src/vespa/fnet/controlpacket.h
@@ -38,6 +38,7 @@ public:
FNET_CMD_IOC_ENABLE_WRITE,
FNET_CMD_IOC_HANDSHAKE_ACT,
FNET_CMD_IOC_CLOSE,
+ FNET_CMD_DETACH_SERVER_ADAPTER,
FNET_CMD_EXECUTE,
FNET_CMD_TIMEOUT,
FNET_CMD_BAD_PACKET,
@@ -50,6 +51,7 @@ public:
static FNET_ControlPacket IOCEnableWrite;
static FNET_ControlPacket IOCHandshakeACT;
static FNET_ControlPacket IOCClose;
+ static FNET_ControlPacket DetachServerAdapter;
static FNET_ControlPacket Execute;
static FNET_ControlPacket Timeout;
static FNET_ControlPacket BadPacket;
diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp
index 3a323390ad6..0a31b9d882b 100644
--- a/fnet/src/vespa/fnet/frt/supervisor.cpp
+++ b/fnet/src/vespa/fnet/frt/supervisor.cpp
@@ -10,10 +10,16 @@
#include <vespa/fastos/thread.h>
#include <vespa/vespalib/util/require.h>
+FNET_IPacketStreamer *
+FRT_Supervisor::get_packet_streamer() {
+ static FRT_PacketFactory packet_factory;
+ static FNET_SimplePacketStreamer packet_streamer(&packet_factory);
+ return &packet_streamer;
+}
+
+
FRT_Supervisor::FRT_Supervisor(FNET_Transport *transport)
: _transport(transport),
- _packetFactory(),
- _packetStreamer(&_packetFactory),
_connector(nullptr),
_reflectionManager(),
_rpcHooks(&_reflectionManager)
@@ -27,6 +33,8 @@ FRT_Supervisor::~FRT_Supervisor()
if (_connector != nullptr) {
_connector->Owner()->Close(_connector, /* needref */ false);
}
+ _transport->wait_for_pending_resolves();
+ _transport->detach(this);
_transport->sync();
}
@@ -38,7 +46,7 @@ FRT_Supervisor::Listen(const char *spec)
{
if (_connector != nullptr)
return false;
- _connector = _transport->Listen(spec, &_packetStreamer, this);
+ _connector = _transport->Listen(spec, get_packet_streamer(), this);
return (_connector != nullptr);
}
@@ -64,7 +72,7 @@ FRT_Supervisor::GetTarget(const char *spec)
{
FNET_TransportThread *thread = _transport->select_thread(spec, strlen(spec));
return new FRT_Target(thread->GetScheduler(),
- thread->Connect(spec, &_packetStreamer));
+ thread->Connect(spec, get_packet_streamer()));
}
@@ -73,7 +81,7 @@ FRT_Supervisor::Get2WayTarget(const char *spec, FNET_Context connContext)
{
FNET_TransportThread *thread = _transport->select_thread(spec, strlen(spec));
return new FRT_Target(thread->GetScheduler(),
- thread->Connect(spec, &_packetStreamer,
+ thread->Connect(spec, get_packet_streamer(),
this, connContext));
}
diff --git a/fnet/src/vespa/fnet/frt/supervisor.h b/fnet/src/vespa/fnet/frt/supervisor.h
index 7756c738f09..610baca5d93 100644
--- a/fnet/src/vespa/fnet/frt/supervisor.h
+++ b/fnet/src/vespa/fnet/frt/supervisor.h
@@ -44,12 +44,12 @@ public:
private:
FNET_Transport *_transport;
- FRT_PacketFactory _packetFactory;
- FNET_SimplePacketStreamer _packetStreamer;
FNET_Connector *_connector;
FRT_ReflectionManager _reflectionManager;
RPCHooks _rpcHooks;
+ static FNET_IPacketStreamer *get_packet_streamer();
+
public:
explicit FRT_Supervisor(FNET_Transport *transport);
FRT_Supervisor(const FRT_Supervisor &) = delete;
diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h
index b4f061e5bc0..106e31c9236 100644
--- a/fnet/src/vespa/fnet/iocomponent.h
+++ b/fnet/src/vespa/fnet/iocomponent.h
@@ -8,6 +8,7 @@
#include <condition_variable>
#include <chrono>
+class FNET_IServerAdapter;
class FNET_TransportThread;
class FNET_Config;
@@ -185,6 +186,14 @@ public:
//----------- virtual methods below ----------------------//
/**
+ * Used to identify which components are related to a specific
+ * server adapter to be able to perform partial shutdown.
+ *
+ * @return the server adapter attached to this component
+ **/
+ virtual FNET_IServerAdapter *server_adapter() = 0;
+
+ /**
* This function is called as the first step of adding an io
* component to the selection loop. The default implementation
* will always return true. This can be overridden to perform
diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp
index 7f0061cfdf5..0a79f324bb9 100644
--- a/fnet/src/vespa/fnet/transport.cpp
+++ b/fnet/src/vespa/fnet/transport.cpp
@@ -158,6 +158,11 @@ FNET_Transport::resolve_async(const vespalib::string &spec,
_async_resolver->resolve_async(spec, std::move(result_handler));
}
+void
+FNET_Transport::wait_for_pending_resolves() {
+ _async_resolver->wait_for_pending_resolves();
+}
+
vespalib::CryptoSocket::UP
FNET_Transport::create_client_crypto_socket(vespalib::SocketHandle socket, const vespalib::SocketSpec &spec)
{
@@ -212,6 +217,14 @@ FNET_Transport::sync()
}
}
+void
+FNET_Transport::detach(FNET_IServerAdapter *server_adapter)
+{
+ for (const auto &thread: _threads) {
+ thread->detach(server_adapter);
+ }
+}
+
FNET_Scheduler *
FNET_Transport::GetScheduler()
{
@@ -231,7 +244,7 @@ FNET_Transport::ShutDown(bool waitFinished)
thread->ShutDown(waitFinished);
}
if (waitFinished) {
- _async_resolver->wait_for_pending_resolves();
+ wait_for_pending_resolves();
_work_pool->shutdown().sync();
}
}
@@ -242,7 +255,7 @@ FNET_Transport::WaitFinished()
for (const auto &thread: _threads) {
thread->WaitFinished();
}
- _async_resolver->wait_for_pending_resolves();
+ wait_for_pending_resolves();
_work_pool->shutdown().sync();
}
diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h
index 3d21234ac07..7dbfd80dfe7 100644
--- a/fnet/src/vespa/fnet/transport.h
+++ b/fnet/src/vespa/fnet/transport.h
@@ -160,6 +160,11 @@ public:
vespalib::AsyncResolver::ResultHandler::WP result_handler);
/**
+ * Wait for all pending resolve requests.
+ **/
+ void wait_for_pending_resolves();
+
+ /**
* Wrap a plain socket endpoint (client side) in a CryptoSocket. The
* implementation will be determined by the CryptoEngine used by
* this Transport.
@@ -250,6 +255,18 @@ public:
void sync();
/**
+ * Detach a server adapter from this transport.
+ *
+ * This will close all connectors and connections referencing the
+ * server adapter. Note that this is an async
+ * operation. 'wait_for_pending_resolves' should be called before
+ * this to make sure any in-flight connections are added
+ * first. 'sync' should be called after this to drain any pending
+ * call-backs.
+ **/
+ void detach(FNET_IServerAdapter *server_adapter);
+
+ /**
* Obtain a pointer to a transport thread scheduler.
*
* @return transport thread scheduler.
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index cbecf417274..53363740bae 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -185,6 +185,22 @@ FNET_TransportThread::handle_close_cmd(FNET_IOComponent *ioc)
}
+void
+FNET_TransportThread::handle_detach_server_adapter_cmd(FNET_IServerAdapter *server_adapter)
+{
+ FNET_IOComponent *component = _componentsHead;
+ while (component != nullptr) {
+ FNET_IOComponent *tmp = component;
+ component = component->_ioc_next;
+ if (tmp->server_adapter() == server_adapter) {
+ RemoveComponent(tmp);
+ tmp->Close();
+ AddDeleteComponent(tmp);
+ }
+ }
+}
+
+
extern "C" {
static void pipehandler(int)
@@ -331,6 +347,11 @@ FNET_TransportThread::Close(FNET_IOComponent *comp, bool needRef)
PostEvent(&FNET_ControlPacket::IOCClose, FNET_Context(comp));
}
+void
+FNET_TransportThread::detach(FNET_IServerAdapter *server_adapter)
+{
+ PostEvent(&FNET_ControlPacket::DetachServerAdapter, FNET_Context(server_adapter));
+}
bool
FNET_TransportThread::execute(FNET_IExecutable *exe)
@@ -411,6 +432,11 @@ FNET_TransportThread::handle_wakeup()
continue;
}
+ if (packet->GetCommand() == FNET_ControlPacket::FNET_CMD_DETACH_SERVER_ADAPTER) {
+ handle_detach_server_adapter_cmd(context._value.SERVER_ADAPTER);
+ continue;
+ }
+
if (context._value.IOC->_flags._ioc_delete) {
context._value.IOC->SubRef();
continue;
diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h
index 4a470fc1e29..c120894ac9c 100644
--- a/fnet/src/vespa/fnet/transport_thread.h
+++ b/fnet/src/vespa/fnet/transport_thread.h
@@ -143,6 +143,7 @@ private:
void handle_add_cmd(FNET_IOComponent *ioc);
void handle_close_cmd(FNET_IOComponent *ioc);
+ void handle_detach_server_adapter_cmd(FNET_IServerAdapter *server_adapter);
/**
* This method is called to initialize the transport thread event
@@ -334,6 +335,17 @@ public:
**/
void Close(FNET_IOComponent *comp, bool needRef = true);
+ /**
+ * Detach a server adapter from this transport.
+ *
+ * This will close all connectors and connections referencing the
+ * server adapter. Note that this is an async
+ * operation. 'wait_for_pending_resolves' (on the owning
+ * Transport) should be called before this to make sure any
+ * in-flight connections are added first. 'sync' should be called
+ * after this to drain any pending call-backs.
+ **/
+ void detach(FNET_IServerAdapter *server_adapter);
/**
* Post an execution event on the transport event queue. The return