aboutsummaryrefslogtreecommitdiffstats
path: root/fnet
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2023-03-06 11:42:42 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2023-03-06 15:11:11 +0000
commit793a0bbc6c8bedcf0ee5de51be687d760defcd57 (patch)
tree016f83e652b0734aa3555df9332c72f9a82ff424 /fnet
parent9a95875744e88f29cb4a78f8bb31bab13e7cec4d (diff)
use ref_counted in fnet
also get rid of some cleanup functions on reference counted classes enable specifying low-level parameters to addref/subref (cnt/reserve)
Diffstat (limited to 'fnet')
-rw-r--r--fnet/src/examples/frt/rpc/echo_client.cpp4
-rw-r--r--fnet/src/examples/frt/rpc/rpc_callback_client.cpp4
-rw-r--r--fnet/src/examples/frt/rpc/rpc_callback_server.cpp2
-rw-r--r--fnet/src/examples/frt/rpc/rpc_client.cpp8
-rw-r--r--fnet/src/examples/frt/rpc/rpc_info.cpp8
-rw-r--r--fnet/src/examples/frt/rpc/rpc_invoke.cpp4
-rw-r--r--fnet/src/examples/ping/pingclient.cpp2
-rw-r--r--fnet/src/examples/ping/pingserver.cpp2
-rw-r--r--fnet/src/tests/connect/connect_test.cpp71
-rw-r--r--fnet/src/tests/connection_spread/connection_spread_test.cpp4
-rw-r--r--fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp20
-rw-r--r--fnet/src/tests/frt/method_pt/method_pt.cpp8
-rw-r--r--fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp4
-rw-r--r--fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp6
-rw-r--r--fnet/src/tests/frt/rpc/detach_return_invoke.cpp10
-rw-r--r--fnet/src/tests/frt/rpc/invoke.cpp6
-rw-r--r--fnet/src/tests/frt/rpc/sharedblob.cpp10
-rw-r--r--fnet/src/tests/info/info.cpp6
-rw-r--r--fnet/src/tests/printstuff/printstuff_test.cpp2
-rw-r--r--fnet/src/tests/transport_debugger/transport_debugger_test.cpp6
-rw-r--r--fnet/src/vespa/fnet/connection.cpp64
-rw-r--r--fnet/src/vespa/fnet/connection.h42
-rw-r--r--fnet/src/vespa/fnet/frt/invoker.cpp4
-rw-r--r--fnet/src/vespa/fnet/frt/packets.cpp2
-rw-r--r--fnet/src/vespa/fnet/frt/reflection.cpp2
-rw-r--r--fnet/src/vespa/fnet/frt/rpcrequest.cpp34
-rw-r--r--fnet/src/vespa/fnet/frt/rpcrequest.h23
-rw-r--r--fnet/src/vespa/fnet/frt/supervisor.cpp6
-rw-r--r--fnet/src/vespa/fnet/frt/target.cpp1
-rw-r--r--fnet/src/vespa/fnet/frt/target.h17
-rw-r--r--fnet/src/vespa/fnet/iocomponent.cpp58
-rw-r--r--fnet/src/vespa/fnet/iocomponent.h48
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp26
-rw-r--r--fnet/src/vespa/fnet/transport_thread.h10
34 files changed, 159 insertions, 365 deletions
diff --git a/fnet/src/examples/frt/rpc/echo_client.cpp b/fnet/src/examples/frt/rpc/echo_client.cpp
index 0176337c466..869a010bfda 100644
--- a/fnet/src/examples/frt/rpc/echo_client.cpp
+++ b/fnet/src/examples/frt/rpc/echo_client.cpp
@@ -85,8 +85,8 @@ EchoClient::main(int argc, char **argv)
} else {
printf("Return values != parameters.\n");
}
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
return 0;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp
index b41c40ba29d..c52d48f24eb 100644
--- a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp
@@ -102,8 +102,8 @@ MyApp::main(int argc, char **argv)
ok = false;
}
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
return ok ? 0 : 1;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
index 37a819c6ee7..5c21f73da7f 100644
--- a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
@@ -27,7 +27,7 @@ void do_callback(FRT_RPCRequest *req) {
cb->GetErrorCode(),
cb->GetErrorMessage());
}
- cb->SubRef();
+ cb->internal_subref();
req->Return();
}
diff --git a/fnet/src/examples/frt/rpc/rpc_client.cpp b/fnet/src/examples/frt/rpc/rpc_client.cpp
index acb20a880c9..55168a2acba 100644
--- a/fnet/src/examples/frt/rpc/rpc_client.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_client.cpp
@@ -49,7 +49,7 @@ RPCClient::main(int argc, char **argv)
}
fprintf(stdout, "\nTesting addFloat method\n");
- req->SubRef();
+ req->internal_subref();
req = supervisor.AllocRPCRequest();
req->SetMethodName("addFloat");
req->GetParams()->AddFloat(float1);
@@ -65,7 +65,7 @@ RPCClient::main(int argc, char **argv)
}
fprintf(stdout, "\nTesting addDouble method\n");
- req->SubRef();
+ req->internal_subref();
req = supervisor.AllocRPCRequest();
req->SetMethodName("addDouble");
req->GetParams()->AddDouble(double1);
@@ -80,8 +80,8 @@ RPCClient::main(int argc, char **argv)
req->GetErrorMessage());
}
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
return 0;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_info.cpp b/fnet/src/examples/frt/rpc/rpc_info.cpp
index 9734342a24e..ab534a254c2 100644
--- a/fnet/src/examples/frt/rpc/rpc_info.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_info.cpp
@@ -15,16 +15,16 @@ public:
void GetReq(FRT_RPCRequest **req, FRT_Supervisor *supervisor)
{
if ((*req) != nullptr)
- (*req)->SubRef();
+ (*req)->internal_subref();
(*req) = supervisor->AllocRPCRequest();
}
void FreeReqs(FRT_RPCRequest *r1, FRT_RPCRequest *r2)
{
if (r1 != nullptr)
- r1->SubRef();
+ r1->internal_subref();
if (r2 != nullptr)
- r2->SubRef();
+ r2->internal_subref();
}
void DumpMethodInfo(const char *indent, FRT_RPCRequest *info,
@@ -130,7 +130,7 @@ RPCInfo::main(int argc, char **argv)
m_list->GetErrorMessage());
}
FreeReqs(m_list, info);
- target->SubRef();
+ target->internal_subref();
return 0;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_invoke.cpp b/fnet/src/examples/frt/rpc/rpc_invoke.cpp
index 9f3e90f469a..d56847098d8 100644
--- a/fnet/src/examples/frt/rpc/rpc_invoke.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_invoke.cpp
@@ -113,8 +113,8 @@ RPCClient::run(int argc, char **argv)
retCode = 3;
}
}
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
return retCode;
}
diff --git a/fnet/src/examples/ping/pingclient.cpp b/fnet/src/examples/ping/pingclient.cpp
index 43296df7e57..b59df31607a 100644
--- a/fnet/src/examples/ping/pingclient.cpp
+++ b/fnet/src/examples/ping/pingclient.cpp
@@ -86,7 +86,7 @@ PingClient::main(int argc, char **argv)
packet->Free();
}
if (conn != nullptr)
- conn->SubRef();
+ conn->internal_subref();
transport.ShutDown(true);
return 0;
}
diff --git a/fnet/src/examples/ping/pingserver.cpp b/fnet/src/examples/ping/pingserver.cpp
index fb5b12b66c0..79a67cd18a7 100644
--- a/fnet/src/examples/ping/pingserver.cpp
+++ b/fnet/src/examples/ping/pingserver.cpp
@@ -51,7 +51,7 @@ PingServer::main(int argc, char **argv)
FNET_Connector *listener =
transport.Listen(argv[1], &streamer, this);
if (listener != nullptr)
- listener->SubRef();
+ listener->internal_subref();
FNET_SignalShutDown ssd(transport);
transport.Main();
diff --git a/fnet/src/tests/connect/connect_test.cpp b/fnet/src/tests/connect/connect_test.cpp
index 9d566cf37a7..d635fea6f94 100644
--- a/fnet/src/tests/connect/connect_test.cpp
+++ b/fnet/src/tests/connect/connect_test.cpp
@@ -86,25 +86,22 @@ struct BlockingCryptoEngine : public CryptoEngine {
//-----------------------------------------------------------------------------
-struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler {
+struct TransportFixture : FNET_IPacketHandler {
FNET_SimplePacketStreamer streamer;
FNET_Transport transport;
Gate conn_lost;
- Gate conn_deleted;
- TransportFixture() : streamer(nullptr), transport(),
- conn_lost(), conn_deleted()
- {
+ TransportFixture() : streamer(nullptr), transport(), conn_lost() {
transport.Start();
}
TransportFixture(AsyncResolver::HostResolver::SP host_resolver)
: streamer(nullptr), transport(fnet::TransportConfig().resolver(make_resolver(std::move(host_resolver)))),
- conn_lost(), conn_deleted()
+ conn_lost()
{
transport.Start();
}
TransportFixture(CryptoEngine::SP crypto)
: streamer(nullptr), transport(fnet::TransportConfig().crypto(std::move(crypto))),
- conn_lost(), conn_deleted()
+ conn_lost()
{
transport.Start();
}
@@ -114,14 +111,12 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler {
packet->Free();
return FNET_FREE_CHANNEL;
}
- void Cleanup(FNET_Connection *) override { conn_deleted.countDown(); }
FNET_Connection *connect(const vespalib::string &spec) {
FNET_Connection *conn = transport.Connect(spec.c_str(), &streamer);
ASSERT_TRUE(conn != nullptr);
if (conn->OpenChannel(this, FNET_Context()) == nullptr) {
conn_lost.countDown();
}
- conn->SetCleanupHandler(this);
return conn;
}
~TransportFixture() override {
@@ -131,8 +126,26 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler {
//-----------------------------------------------------------------------------
-TEST_MT_FFF("require that normal connect works", 2,
- ServerSocket("tcp/0"), TransportFixture(), TimeBomb(60))
+struct ConnCheck {
+ uint64_t target;
+ ConnCheck() : target(FNET_Connection::get_num_connections()) {
+ EXPECT_EQUAL(target, uint64_t(0));
+ }
+ bool at_target() const { return (FNET_Connection::get_num_connections() == target); };
+ bool await(duration max_wait) const {
+ auto until = saturated_add(steady_clock::now(), max_wait);
+ while (!at_target() && steady_clock::now() < until) {
+ std::this_thread::sleep_for(1ms);
+ }
+ return at_target();
+ }
+ void await() const {
+ ASSERT_TRUE(await(3600s));
+ }
+};
+
+TEST_MT_FFFF("require that normal connect works", 2,
+ ServerSocket("tcp/0"), TransportFixture(), ConnCheck(), TimeBomb(60))
{
if (thread_id == 0) {
SocketHandle socket = f1.accept();
@@ -144,23 +157,23 @@ TEST_MT_FFF("require that normal connect works", 2,
TEST_BARRIER();
conn->Owner()->Close(conn);
f2.conn_lost.await();
- EXPECT_TRUE(!f2.conn_deleted.await(short_time));
- conn->SubRef();
- f2.conn_deleted.await();
+ EXPECT_TRUE(!f3.await(short_time));
+ conn->internal_subref();
+ f3.await();
}
}
-TEST_FF("require that bogus connect fail asynchronously", TransportFixture(), TimeBomb(60)) {
+TEST_FFF("require that bogus connect fail asynchronously", TransportFixture(), ConnCheck(), TimeBomb(60)) {
FNET_Connection *conn = f1.connect("invalid");
f1.conn_lost.await();
- EXPECT_TRUE(!f1.conn_deleted.await(short_time));
- conn->SubRef();
- f1.conn_deleted.await();
+ EXPECT_TRUE(!f2.await(short_time));
+ conn->internal_subref();
+ f2.await();
}
-TEST_MT_FFFF("require that async close can be called before async resolve completes", 2,
- ServerSocket("tcp/0"), std::shared_ptr<BlockingHostResolver>(new BlockingHostResolver()),
- TransportFixture(f2), TimeBomb(60))
+TEST_MT_FFFFF("require that async close can be called before async resolve completes", 2,
+ ServerSocket("tcp/0"), std::shared_ptr<BlockingHostResolver>(new BlockingHostResolver()),
+ TransportFixture(f2), ConnCheck(), TimeBomb(60))
{
if (thread_id == 0) {
SocketHandle socket = f1.accept();
@@ -172,16 +185,16 @@ TEST_MT_FFFF("require that async close can be called before async resolve comple
conn->Owner()->Close(conn);
f3.conn_lost.await();
f2->release_caller();
- EXPECT_TRUE(!f3.conn_deleted.await(short_time));
- conn->SubRef();
- f3.conn_deleted.await();
+ EXPECT_TRUE(!f4.await(short_time));
+ conn->internal_subref();
+ f4.await();
f1.shutdown();
}
}
-TEST_MT_FFFF("require that async close during async do_handshake_work works", 2,
- ServerSocket("tcp/0"), std::shared_ptr<BlockingCryptoEngine>(new BlockingCryptoEngine()),
- TransportFixture(f2), TimeBomb(60))
+TEST_MT_FFFFF("require that async close during async do_handshake_work works", 2,
+ ServerSocket("tcp/0"), std::shared_ptr<BlockingCryptoEngine>(new BlockingCryptoEngine()),
+ TransportFixture(f2), ConnCheck(), TimeBomb(60))
{
if (thread_id == 0) {
SocketHandle socket = f1.accept();
@@ -196,10 +209,10 @@ TEST_MT_FFFF("require that async close during async do_handshake_work works", 2,
f3.conn_lost.await();
TEST_BARRIER(); // #1
// verify that pending work keeps relevant objects alive
- EXPECT_TRUE(!f3.conn_deleted.await(short_time));
+ EXPECT_TRUE(!f4.await(short_time));
EXPECT_TRUE(!f2->handshake_socket_deleted.await(short_time));
f2->handshake_work_exit.countDown();
- f3.conn_deleted.await();
+ f4.await();
f2->handshake_socket_deleted.await();
}
}
diff --git a/fnet/src/tests/connection_spread/connection_spread_test.cpp b/fnet/src/tests/connection_spread/connection_spread_test.cpp
index 6286ce65657..5908e6a4982 100644
--- a/fnet/src/tests/connection_spread/connection_spread_test.cpp
+++ b/fnet/src/tests/connection_spread/connection_spread_test.cpp
@@ -84,9 +84,9 @@ TEST_F("require that connections are spread among transport threads", Fixture)
f1.wait_for_components(256, 257);
check_threads(f1.client, 8, "client");
check_threads(f1.server, 8, "server");
- listener->SubRef();
+ listener->internal_subref();
for (FNET_Connection *conn: connections) {
- conn->SubRef();
+ conn->internal_subref();
}
}
diff --git a/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp b/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp
index 6325c60413a..c5ca6dc6ce9 100644
--- a/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp
+++ b/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp
@@ -36,7 +36,7 @@ struct RpcFixture : FRT_Invokable {
}
~RpcFixture() {
if (back_conn.load() != nullptr) {
- back_conn.load()->SubRef();
+ back_conn.load()->internal_subref();
}
}
uint32_t port() const { return orb.GetListenPort(); }
@@ -61,7 +61,7 @@ struct RpcFixture : FRT_Invokable {
ASSERT_TRUE(back_conn.load() == nullptr);
back_conn.store(req->GetConnection());
ASSERT_TRUE(back_conn.load() != nullptr);
- back_conn.load()->AddRef();
+ back_conn.load()->internal_addref();
}
FRT_Target *meta_connect(uint32_t port) {
auto *target = orb.Get2WayTarget(fmt("tcp/localhost:%u", port).c_str());
@@ -69,7 +69,7 @@ struct RpcFixture : FRT_Invokable {
req->SetMethodName("connect");
target->InvokeSync(req, 300.0);
ASSERT_TRUE(req->CheckReturnTypes(""));
- req->SubRef();
+ req->internal_subref();
return target;
};
static int check_result(FRT_RPCRequest *req, uint64_t expect) {
@@ -81,7 +81,7 @@ struct RpcFixture : FRT_Invokable {
ASSERT_EQUAL(ret, expect);
++num_ok;
}
- req->SubRef();
+ req->internal_subref();
return num_ok;
}
static int verify_rpc(FNET_Connection *conn) {
@@ -101,7 +101,7 @@ struct RpcFixture : FRT_Invokable {
int verify_rpc(FRT_Target *target, uint32_t port) {
auto *my_target = connect(port);
int num_ok = verify_rpc(target) + verify_rpc(my_target) + verify_rpc(back_conn.load());
- my_target->SubRef();
+ my_target->internal_subref();
return num_ok;
}
};
@@ -135,8 +135,8 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic
EXPECT_EQUAL(RpcFixture::verify_rpc(target), 0); // outgoing 2way target should be closed
EXPECT_EQUAL(RpcFixture::verify_rpc(client_target), 1); // pure client target should not be closed
TEST_BARRIER(); // #5
- target->SubRef();
- client_target->SubRef();
+ target->internal_subref();
+ client_target->internal_subref();
} else if (thread_id == 1) { // server 2 (talks to client 2)
auto self = std::make_unique<RpcFixture>(f1);
f3 = self->port();
@@ -146,7 +146,7 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic
TEST_BARRIER(); // #3
TEST_BARRIER(); // #4
TEST_BARRIER(); // #5
- target->SubRef();
+ target->internal_subref();
} else if (thread_id == 2) { // client 1 (talks to server 1)
auto self = std::make_unique<RpcFixture>(f1);
f4 = self->port();
@@ -165,7 +165,7 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic
TEST_BARRIER(); // #4
EXPECT_EQUAL(self->verify_rpc(target, f2), 0);
TEST_BARRIER(); // #5
- target->SubRef();
+ target->internal_subref();
} else { // client 2 (talks to server 2)
ASSERT_EQUAL(thread_id, 3u);
auto self = std::make_unique<RpcFixture>(f1);
@@ -179,7 +179,7 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic
TEST_BARRIER(); // #4
EXPECT_EQUAL(self->verify_rpc(target, f3), 3);
TEST_BARRIER(); // #5
- target->SubRef();
+ target->internal_subref();
}
}
diff --git a/fnet/src/tests/frt/method_pt/method_pt.cpp b/fnet/src/tests/frt/method_pt/method_pt.cpp
index 2ac706369ae..d6c42ef7790 100644
--- a/fnet/src/tests/frt/method_pt/method_pt.cpp
+++ b/fnet/src/tests/frt/method_pt/method_pt.cpp
@@ -174,7 +174,7 @@ void finiTest() {
delete _complexHandler;
delete _mediumHandler;
delete _simpleHandler;
- _target->SubRef();
+ _target->internal_subref();
_server.reset();
}
@@ -187,7 +187,7 @@ TEST("method pt") {
//-------------------------------- MEDIUM
- req->SubRef();
+ req->internal_subref();
req = FRT_Supervisor::AllocRPCRequest();
req->SetMethodName("mediumMethod");
_target->InvokeSync(req, 60.0);
@@ -195,7 +195,7 @@ TEST("method pt") {
//-------------------------------- COMPLEX
- req->SubRef();
+ req->internal_subref();
req = FRT_Supervisor::AllocRPCRequest();
req->SetMethodName("complexMethod");
_target->InvokeSync(req, 60.0);
@@ -213,7 +213,7 @@ TEST("method pt") {
fprintf(stderr, "Object inheritance NOT ok for method handlers\n");
}
- req->SubRef();
+ req->internal_subref();
}
//-------------------------------------------------------------
diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
index 41bfb7d06a6..74f69541d8b 100644
--- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
+++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
@@ -125,8 +125,8 @@ void perform_test(size_t thread_id, Client &client, Result &result, bool vital =
BenchmarkTimer::benchmark(invoke, invoke, 0.5);
EXPECT_GREATER_EQUAL(seq, loop_cnt);
result.req_per_sec[thread_id] = double(loop_cnt) / t;
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
TEST_BARRIER();
if (thread_id == 0) {
result.print();
diff --git a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp
index d15fca93c0b..812f0a57c5e 100644
--- a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp
+++ b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp
@@ -86,7 +86,7 @@ void benchmark_rpc(Fixture &fixture, bool reconnect) {
auto invoke = [&seq, &target, &req, &fixture, reconnect](){
TT_Sample sample(req_tag);
if (reconnect) {
- target->SubRef();
+ target->internal_subref();
target = fixture.connect();
}
req = fixture.orb.AllocRPCRequest(req);
@@ -101,8 +101,8 @@ void benchmark_rpc(Fixture &fixture, bool reconnect) {
auto before = TimeTracer::now();
double t = BenchmarkTimer::benchmark(invoke, 5.0);
auto after = TimeTracer::now();
- target->SubRef();
- req->SubRef();
+ target->internal_subref();
+ req->internal_subref();
auto stats = TimeTracer::extract().by_time(before, after).by_tag(req_tag.id()).get();
ASSERT_TRUE(stats.size() > 0);
std::sort(stats.begin(), stats.end(), DurationCmp());
diff --git a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
index 17c38ab6e3a..9a0f1778cb6 100644
--- a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
+++ b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
@@ -31,14 +31,14 @@ struct Server : public FRT_Invokable
void rpc_hook(FRT_RPCRequest *req) {
FNET_Connection *conn = req->GetConnection();
- conn->AddRef(); // need to keep it alive
+ conn->internal_addref(); // need to keep it alive
req->Detach();
req->Return(); // will free request channel
FRT_RPCRequest *r = orb.AllocRPCRequest();
r->SetMethodName("frt.rpc.ping");
// might re-use request channel before it is unlinked from hashmap
orb.InvokeAsync(orb.GetTransport(), conn, r, 5.0, &receptor);
- conn->SubRef(); // invocation will now keep the connection alive as needed
+ conn->internal_subref(); // invocation will now keep the connection alive as needed
}
};
@@ -61,11 +61,11 @@ TEST("detach return invoke") {
}
std::this_thread::sleep_for(10ms);
}
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
if (receptor.req.load() != nullptr) {
EXPECT_TRUE(!receptor.req.load()->IsError());
- receptor.req.load()->SubRef();
+ receptor.req.load()->internal_subref();
}
EXPECT_TRUE(receptor.req.load() != nullptr);
};
diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp
index e930c1252bf..f06c7428c22 100644
--- a/fnet/src/tests/frt/rpc/invoke.cpp
+++ b/fnet/src/tests/frt/rpc/invoke.cpp
@@ -65,7 +65,7 @@ public:
}
~MyReq() {
if (_req != nullptr) {
- _req->SubRef();
+ _req->internal_subref();
}
}
MyReq(const MyReq &rhs) = delete;
@@ -331,7 +331,7 @@ public:
}
~Fixture() {
- _target->SubRef();
+ _target->internal_subref();
}
};
@@ -410,7 +410,7 @@ TEST_F("require that a bad target gives connection error", Fixture()) {
{
FRT_Target *bad_target = f1.make_bad_target();
bad_target->InvokeSync(req.borrow(), timeout);
- bad_target->SubRef();
+ bad_target->internal_subref();
}
EXPECT_EQUAL(req.get().GetErrorCode(), FRTE_RPC_CONNECTION);
}
diff --git a/fnet/src/tests/frt/rpc/sharedblob.cpp b/fnet/src/tests/frt/rpc/sharedblob.cpp
index 2ccb44d03cb..94f57a136a4 100644
--- a/fnet/src/tests/frt/rpc/sharedblob.cpp
+++ b/fnet/src/tests/frt/rpc/sharedblob.cpp
@@ -122,7 +122,7 @@ struct ServerSampler : public FRT_Invokable
dataSet.sample(*req->GetReturn()); // server return before drop
// keep request to sample return after drop
- req->AddRef();
+ req->internal_addref();
serverReq = req;
}
};
@@ -176,7 +176,7 @@ TEST("testExplicitShared") {
req->GetParams()->AddSharedData(&blob);
EXPECT_EQUAL(4, blob.refcnt);
- req->SubRef();
+ req->internal_subref();
EXPECT_EQUAL(1, blob.refcnt);
}
@@ -262,10 +262,10 @@ TEST("testImplicitShared") {
}
if (serverSampler.serverReq != 0) {
- serverSampler.serverReq->SubRef();
+ serverSampler.serverReq->internal_subref();
}
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp
index 00075cb75dd..92fe6526a10 100644
--- a/fnet/src/tests/info/info.cpp
+++ b/fnet/src/tests/info/info.cpp
@@ -63,9 +63,9 @@ TEST("info") {
fprintf(stderr, "FD_SETSIZE: %d\n", l[2]._intval32);
fprintf(stderr, "sizeof(FRT_RPCRequest): %d\n", l[3]._intval32);
- target->SubRef();
- local_info->SubRef();
- remote_info->SubRef();
+ target->internal_subref();
+ local_info->internal_subref();
+ remote_info->internal_subref();
};
TEST("size of important objects")
diff --git a/fnet/src/tests/printstuff/printstuff_test.cpp b/fnet/src/tests/printstuff/printstuff_test.cpp
index a9621728c5a..bd76ad29405 100644
--- a/fnet/src/tests/printstuff/printstuff_test.cpp
+++ b/fnet/src/tests/printstuff/printstuff_test.cpp
@@ -37,7 +37,7 @@ TEST("rpc packets in a queue") {
q2.QueuePacket(&req->getStash().create<FRT_RPCRequestPacket>(req, 0, false), FNET_Context());
q2.Print();
}
- req->SubRef();
+ req->internal_subref();
}
TEST("info") {
diff --git a/fnet/src/tests/transport_debugger/transport_debugger_test.cpp b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp
index a363b1df4c2..eaf2fd71bde 100644
--- a/fnet/src/tests/transport_debugger/transport_debugger_test.cpp
+++ b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp
@@ -113,9 +113,9 @@ TEST_FF("transport layers can be run with transport debugger", Fixture(), vespal
EXPECT_EQUAL(req4->GetErrorCode(), FRTE_RPC_TIMEOUT);
ASSERT_TRUE(req6->CheckReturnTypes("l"));
EXPECT_EQUAL(req6->GetReturn()->GetValue(0)._intval64, 8u);
- target->SubRef();
- req4->SubRef();
- req6->SubRef();
+ target->internal_subref();
+ req4->internal_subref();
+ req6->internal_subref();
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp
index e344f2a22a6..fef8a6bf01b 100644
--- a/fnet/src/vespa/fnet/connection.cpp
+++ b/fnet/src/vespa/fnet/connection.cpp
@@ -63,7 +63,7 @@ struct DoHandshakeWork : vespalib::Executor::Task {
DoHandshakeWork(FNET_Connection *conn_in, vespalib::CryptoSocket *socket_in)
: conn(conn_in), socket(socket_in)
{
- conn->AddRef();
+ conn->internal_addref();
}
void run() override {
socket->do_handshake_work();
@@ -82,7 +82,7 @@ FNET_Connection::ResolveHandler::ResolveHandler(FNET_Connection *conn)
: connection(conn),
address()
{
- connection->AddRef();
+ connection->internal_addref();
}
void
@@ -94,7 +94,7 @@ FNET_Connection::ResolveHandler::handle_result(vespalib::SocketAddress result)
FNET_Connection::ResolveHandler::~ResolveHandler()
{
- connection->SubRef();
+ connection->internal_subref();
}
@@ -149,10 +149,9 @@ FNET_Connection::SetState(State state)
}
if ( ! toDelete.empty() ) {
- for (const FNET_Channel::UP & ch : toDelete) {
- (void) ch;
- SubRef_NoLock();
- }
+ const uint32_t cnt = toDelete.size();
+ const uint32_t reserve = 1;
+ internal_subref(cnt, reserve);
}
}
@@ -185,14 +184,14 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode,
_channels.Unregister(channel);
if (hp_rc == FNET_IPacketHandler::FNET_FREE_CHANNEL) {
- SubRef_NoLock();
+ internal_subref(1, 1);
toDelete.reset(channel);
}
}
} else if (CanAcceptChannels() && IsFromPeer(chid)) { // open new channel
FNET_Channel::UP newChannel(new FNET_Channel(chid, this));
channel = newChannel.get();
- AddRef_NoLock();
+ internal_addref();
BeforeCallback(guard, channel);
if (_serverAdapter->InitChannel(channel, pcode)) {
@@ -203,7 +202,7 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode,
AfterCallback(guard);
if (hp_rc == FNET_IPacketHandler::FNET_FREE_CHANNEL) {
- SubRef_NoLock();
+ internal_subref(1, 1);
} else if (hp_rc == FNET_IPacketHandler::FNET_KEEP_CHANNEL) {
_channels.Register(newChannel.release());
} else {
@@ -212,7 +211,7 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode,
} else {
AfterCallback(guard);
- SubRef_NoLock();
+ internal_subref(1, 1);
guard.unlock();
LOG(debug, "Connection(%s): channel init failed", GetSpec());
@@ -489,8 +488,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner,
_myQueue(256),
_output(0),
_channels(),
- _callbackTarget(nullptr),
- _cleanup(nullptr)
+ _callbackTarget(nullptr)
{
assert(_socket && (_socket->get_fd() >= 0));
_num_connections.fetch_add(1, std::memory_order_relaxed);
@@ -520,8 +518,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner,
_myQueue(256),
_output(0),
_channels(),
- _callbackTarget(nullptr),
- _cleanup(nullptr)
+ _callbackTarget(nullptr)
{
_num_connections.fetch_add(1, std::memory_order_relaxed);
}
@@ -530,7 +527,6 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner,
FNET_Connection::~FNET_Connection()
{
assert(!_resolve_handler);
- assert(_cleanup == nullptr);
_num_connections.fetch_sub(1, std::memory_order_relaxed);
}
@@ -576,12 +572,6 @@ FNET_Connection::handle_handshake_act()
return ((GetState() == FNET_CONNECTING) && handshake());
}
-void
-FNET_Connection::SetCleanupHandler(FNET_IConnectionCleanupHandler *handler)
-{
- _cleanup = handler;
-}
-
FNET_Channel*
FNET_Connection::OpenChannel(FNET_IPacketHandler *handler,
@@ -598,7 +588,7 @@ FNET_Connection::OpenChannel(FNET_IPacketHandler *handler,
*chid = newChannel->GetID();
}
WaitCallback(guard, nullptr);
- AddRef_NoLock();
+ internal_addref();
ret = newChannel.release();
_channels.Register(ret);
}
@@ -614,7 +604,7 @@ FNET_Connection::OpenChannel()
{
std::lock_guard<std::mutex> guard(_ioc_lock);
chid = GetNextID();
- AddRef_NoLock();
+ internal_addref();
}
return new FNET_Channel(chid, this);
}
@@ -633,18 +623,20 @@ void
FNET_Connection::FreeChannel(FNET_Channel *channel)
{
delete channel;
- SubRef_HasLock(std::unique_lock<std::mutex>(_ioc_lock));
+ internal_subref();
}
void
FNET_Connection::CloseAndFreeChannel(FNET_Channel *channel)
{
- std::unique_lock<std::mutex> guard(_ioc_lock);
- WaitCallback(guard, channel);
- _channels.Unregister(channel);
- SubRef_HasLock(std::move(guard));
- delete channel;
+ {
+ std::unique_lock<std::mutex> guard(_ioc_lock);
+ WaitCallback(guard, channel);
+ _channels.Unregister(channel);
+ delete channel;
+ }
+ internal_subref();
}
@@ -668,7 +660,7 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid)
_writeWork++;
_queue.QueuePacket_NoLock(packet, FNET_Context(chid));
if ((writeWork == 0) && (GetState() == FNET_CONNECTED)) {
- AddRef_NoLock();
+ internal_addref();
guard.unlock();
Owner()->EnableWrite(this, /* needRef = */ false);
}
@@ -686,16 +678,6 @@ FNET_Connection::Sync()
void
-FNET_Connection::CleanupHook()
-{
- if (_cleanup != nullptr) {
- _cleanup->Cleanup(this);
- _cleanup = nullptr;
- }
-}
-
-
-void
FNET_Connection::Close()
{
_resolve_handler.reset();
diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h
index 10cf74e79de..80927fd375c 100644
--- a/fnet/src/vespa/fnet/connection.h
+++ b/fnet/src/vespa/fnet/connection.h
@@ -21,30 +21,6 @@ class FNET_IPacketHandler;
namespace vespalib::net { class ConnectionAuthContext; }
/**
- * Interface implemented by objects that want to perform connection
- * cleanup. Use the SetCleanupHandler method to register with a
- * connection. Currently, there can only be one cleanup handler per
- * connection.
- **/
-class FNET_IConnectionCleanupHandler
-{
-public:
-
- /**
- * Destructor. No cleanup needed for base class.
- */
- virtual ~FNET_IConnectionCleanupHandler(void) {}
-
- /**
- * Perform connection cleanup.
- *
- * @param conn the connection
- **/
- virtual void Cleanup(FNET_Connection *conn) = 0;
-};
-
-
-/**
* This class represents a single connection with another
* computer. The binary format on a connection is defined by the
* PacketStreamer given to the constructor. Each connection object may
@@ -115,8 +91,6 @@ private:
FNET_ChannelLookup _channels; // channel 'DB'
FNET_Channel *_callbackTarget; // target of current callback
- FNET_IConnectionCleanupHandler *_cleanup; // cleanup handler
-
std::unique_ptr<vespalib::net::ConnectionAuthContext> _auth_context;
static std::atomic<uint64_t> _num_connections; // total number of connections
@@ -377,14 +351,6 @@ public:
**/
bool handle_handshake_act() override;
- /**
- * Register a cleanup handler to be invoked when this connection is
- * about to be destructed.
- *
- * @param handler the cleanup handler
- **/
- void SetCleanupHandler(FNET_IConnectionCleanupHandler *handler);
-
/**
* Open a new channel on this connection. This method will return
@@ -468,14 +434,6 @@ public:
/**
- * Invoked by the io component superclass before the object is
- * destructed. Will invoke the Cleanup method on the cleanup handler
- * for this connection, if present.
- **/
- void CleanupHook() override;
-
-
- /**
* Close this connection immidiately. NOTE: this method should only
* be called by the transport thread.
**/
diff --git a/fnet/src/vespa/fnet/frt/invoker.cpp b/fnet/src/vespa/fnet/frt/invoker.cpp
index d4b35720a8d..421bd0c4d0b 100644
--- a/fnet/src/vespa/fnet/frt/invoker.cpp
+++ b/fnet/src/vespa/fnet/frt/invoker.cpp
@@ -98,7 +98,7 @@ FRT_RPCInvoker::HandleDone(bool freeChannel)
}
// send response to client or get rid of it
if (_noReply || (_req->GetErrorCode() == FRTE_RPC_BAD_REQUEST))
- _req->SubRef();
+ _req->internal_subref();
else
ch->Send(_req->CreateReplyPacket());
@@ -128,7 +128,7 @@ void FRT_HookInvoker::Invoke()
_req->SetDetachedPT(&detached);
(_hook->GetHandler()->*_hook->GetMethod())(_req);
assert(!detached);
- _req->SubRef();
+ _req->internal_subref();
}
void
diff --git a/fnet/src/vespa/fnet/frt/packets.cpp b/fnet/src/vespa/fnet/frt/packets.cpp
index 134a869eafb..5e600c8caa3 100644
--- a/fnet/src/vespa/fnet/frt/packets.cpp
+++ b/fnet/src/vespa/fnet/frt/packets.cpp
@@ -14,7 +14,7 @@ FRT_RPCPacket::Free()
{
if (_ownsRef) {
_req->DiscardBlobs();
- _req->SubRef();
+ _req->internal_subref();
}
}
diff --git a/fnet/src/vespa/fnet/frt/reflection.cpp b/fnet/src/vespa/fnet/frt/reflection.cpp
index af7fa069eb9..c09057ea675 100644
--- a/fnet/src/vespa/fnet/frt/reflection.cpp
+++ b/fnet/src/vespa/fnet/frt/reflection.cpp
@@ -153,7 +153,7 @@ FRT_ReflectionBuilder::FRT_ReflectionBuilder(FRT_Supervisor *supervisor)
FRT_ReflectionBuilder::~FRT_ReflectionBuilder()
{
Flush();
- _req->SubRef();
+ _req->internal_subref();
}
diff --git a/fnet/src/vespa/fnet/frt/rpcrequest.cpp b/fnet/src/vespa/fnet/frt/rpcrequest.cpp
index ac6dbb26ad6..6870a275ab1 100644
--- a/fnet/src/vespa/fnet/frt/rpcrequest.cpp
+++ b/fnet/src/vespa/fnet/frt/rpcrequest.cpp
@@ -10,7 +10,6 @@ FRT_RPCRequest::FRT_RPCRequest()
_context(),
_params(_stash),
_return(_stash),
- _refcnt(1),
_completed(0),
_errorCode(FRTE_NO_ERROR),
_errorMessageLen(0),
@@ -19,14 +18,10 @@ FRT_RPCRequest::FRT_RPCRequest()
_methodName(nullptr),
_detachedPT(nullptr),
_abortHandler(nullptr),
- _returnHandler(nullptr),
- _cleanupHandler(nullptr)
+ _returnHandler(nullptr)
{ }
-FRT_RPCRequest::~FRT_RPCRequest()
-{
- assert(_refcnt == 0);
-}
+FRT_RPCRequest::~FRT_RPCRequest() = default;
void
FRT_RPCRequest::SetError(uint32_t errorCode, const char *errorMessage, uint32_t errorMessageLen)
@@ -87,18 +82,9 @@ FRT_RPCRequest::GetConnection() {
return _returnHandler->GetConnection();
}
-void
-FRT_RPCRequest::Cleanup() {
- if (_cleanupHandler != nullptr) {
- _cleanupHandler->HandleCleanup();
- _cleanupHandler = nullptr;
- }
-}
void
FRT_RPCRequest::Reset() {
- assert(_refcnt <= 1);
- Cleanup();
_context = FNET_Context();
_params.Reset();
_return.Reset();
@@ -118,7 +104,7 @@ FRT_RPCRequest::Reset() {
bool
FRT_RPCRequest::Recycle()
{
- if (_refcnt > 1 || _errorCode != FRTE_NO_ERROR)
+ if (count_refs() > 1 || _errorCode != FRTE_NO_ERROR)
return false;
Reset();
return true;
@@ -126,18 +112,6 @@ FRT_RPCRequest::Recycle()
void
-FRT_RPCRequest::SubRef()
-{
- int oldVal = _refcnt.fetch_sub(1);
- assert(oldVal > 0);
- if (oldVal == 1) {
- Reset();
- delete this;
- }
-}
-
-
-void
FRT_RPCRequest::Print(uint32_t indent)
{
printf("%*sFRT_RPCRequest {\n", indent, "");
@@ -163,7 +137,7 @@ FRT_RPCRequest::CreateRequestPacket(bool wantReply)
flags |= FLAG_FRT_RPC_LITTLE_ENDIAN;
if (wantReply)
- AddRef();
+ internal_addref();
else
flags |= FLAG_FRT_RPC_NOREPLY;
diff --git a/fnet/src/vespa/fnet/frt/rpcrequest.h b/fnet/src/vespa/fnet/frt/rpcrequest.h
index a095c274687..72dae4a6af1 100644
--- a/fnet/src/vespa/fnet/frt/rpcrequest.h
+++ b/fnet/src/vespa/fnet/frt/rpcrequest.h
@@ -6,6 +6,7 @@
#include "error.h"
#include <vespa/fnet/context.h>
#include <vespa/vespalib/util/stash.h>
+#include <vespa/vespalib/util/ref_counted.h>
#include <atomic>
class FNET_Packet;
@@ -37,20 +38,7 @@ public:
};
-class FRT_ICleanupHandler
-{
-public:
-
- /**
- * Destructor. No cleanup needed for base class.
- */
- virtual ~FRT_ICleanupHandler(void) {}
-
- virtual void HandleCleanup() = 0;
-};
-
-
-class FRT_RPCRequest
+class FRT_RPCRequest : public vespalib::enable_ref_counted
{
private:
using Stash = vespalib::Stash;
@@ -58,7 +46,6 @@ private:
FNET_Context _context;
FRT_Values _params;
FRT_Values _return;
- std::atomic<int> _refcnt;
std::atomic<int> _completed;
uint32_t _errorCode;
uint32_t _errorMessageLen;
@@ -69,7 +56,6 @@ private:
bool *_detachedPT;
FRT_IAbortHandler *_abortHandler;
FRT_IReturnHandler *_returnHandler;
- FRT_ICleanupHandler *_cleanupHandler;
public:
FRT_RPCRequest(const FRT_RPCRequest &) = delete;
@@ -86,9 +72,6 @@ public:
_return.DiscardBlobs();
}
- void AddRef() { _refcnt.fetch_add(1); }
- void SubRef();
-
void SetContext(FNET_Context context) { _context = context; }
FNET_Context GetContext() { return _context; }
@@ -137,10 +120,8 @@ public:
void SetAbortHandler(FRT_IAbortHandler *handler) { _abortHandler = handler; }
void SetReturnHandler(FRT_IReturnHandler *handler) { _returnHandler = handler; }
- void SetCleanupHandler(FRT_ICleanupHandler *handler) { _cleanupHandler = handler; }
bool Abort();
void Return();
FNET_Connection *GetConnection();
- void Cleanup();
};
diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp
index 966c606bf97..7d6b4d727c7 100644
--- a/fnet/src/vespa/fnet/frt/supervisor.cpp
+++ b/fnet/src/vespa/fnet/frt/supervisor.cpp
@@ -31,7 +31,7 @@ FRT_Supervisor::~FRT_Supervisor()
{
_transport->detach(this);
if (_connector != nullptr) {
- _connector->SubRef();
+ _connector->internal_subref();
}
}
@@ -99,7 +99,7 @@ FRT_Supervisor::AllocRPCRequest(FRT_RPCRequest *tradein)
if (tradein->Recycle()) {
return tradein;
}
- tradein->SubRef();
+ tradein->internal_subref();
}
return new FRT_RPCRequest();
}
@@ -113,7 +113,7 @@ FRT_Supervisor::InvokeVoid(FNET_Connection *conn, FRT_RPCRequest *req)
ch->Send(req->CreateRequestPacket(false));
ch->Free();
} else {
- req->SubRef();
+ req->internal_subref();
}
}
diff --git a/fnet/src/vespa/fnet/frt/target.cpp b/fnet/src/vespa/fnet/frt/target.cpp
index 1645fba91ee..47baa95b9e2 100644
--- a/fnet/src/vespa/fnet/frt/target.cpp
+++ b/fnet/src/vespa/fnet/frt/target.cpp
@@ -6,7 +6,6 @@
FRT_Target::~FRT_Target()
{
- assert(_refcnt == 0);
FNET_Connection * conn(_conn);
_conn = nullptr;
if (conn != nullptr) {
diff --git a/fnet/src/vespa/fnet/frt/target.h b/fnet/src/vespa/fnet/frt/target.h
index ac6bf377a14..773daaca8b1 100644
--- a/fnet/src/vespa/fnet/frt/target.h
+++ b/fnet/src/vespa/fnet/frt/target.h
@@ -3,16 +3,16 @@
#pragma once
#include <vespa/fnet/connection.h>
+#include <vespa/vespalib/util/ref_counted.h>
#include <atomic>
class FNET_Scheduler;
class FRT_RPCRequest;
class FRT_IRequestWait;
-class FRT_Target
+class FRT_Target : public vespalib::enable_ref_counted
{
private:
- std::atomic<int> _refcnt;
FNET_Scheduler *_scheduler;
FNET_Connection *_conn;
@@ -21,23 +21,12 @@ private:
public:
FRT_Target(FNET_Scheduler *scheduler, FNET_Connection *conn)
- : _refcnt(1),
- _scheduler(scheduler),
+ : _scheduler(scheduler),
_conn(conn) {}
~FRT_Target();
FNET_Connection *GetConnection() const { return _conn; }
-
- void AddRef() { _refcnt.fetch_add(1); }
- void SubRef() {
- if (_refcnt.fetch_sub(1) == 1) {
- delete this;
- }
- }
-
- int GetRefCnt() const { return _refcnt; }
-
bool IsValid() {
return ((_conn != nullptr) &&
(_conn->GetState() <= FNET_Connection::FNET_CONNECTED));
diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp
index f08718c0c5c..c4fc7d859d4 100644
--- a/fnet/src/vespa/fnet/iocomponent.cpp
+++ b/fnet/src/vespa/fnet/iocomponent.cpp
@@ -16,7 +16,6 @@ FNET_IOComponent::FNET_IOComponent(FNET_TransportThread *owner,
_ioc_spec(spec),
_flags(shouldTimeOut),
_ioc_socket_fd(socket_fd),
- _ioc_refcnt(1),
_ioc_timestamp(vespalib::steady_clock::now()),
_ioc_lock(),
_ioc_cond()
@@ -39,58 +38,6 @@ FNET_IOComponent::UpdateTimeOut() {
_ioc_owner->UpdateTimeOut(this);
}
-void
-FNET_IOComponent::AddRef()
-{
- std::lock_guard<std::mutex> guard(_ioc_lock);
- assert(_ioc_refcnt > 0);
- _ioc_refcnt++;
-}
-
-
-void
-FNET_IOComponent::AddRef_NoLock()
-{
- assert(_ioc_refcnt > 0);
- _ioc_refcnt++;
-}
-
-
-void
-FNET_IOComponent::SubRef()
-{
- {
- std::lock_guard<std::mutex> guard(_ioc_lock);
- assert(_ioc_refcnt > 0);
- if (--_ioc_refcnt > 0) {
- return;
- }
- }
- CleanupHook();
- delete this;
-}
-
-
-void
-FNET_IOComponent::SubRef_HasLock(std::unique_lock<std::mutex> guard)
-{
- assert(_ioc_refcnt > 0);
- if (--_ioc_refcnt > 0) {
- return;
- }
- guard.unlock();
- CleanupHook();
- delete this;
-}
-
-
-void
-FNET_IOComponent::SubRef_NoLock()
-{
- assert(_ioc_refcnt > 1);
- _ioc_refcnt--;
-}
-
void
FNET_IOComponent::attach_selector(Selector &selector)
@@ -141,8 +88,3 @@ FNET_IOComponent::handle_handshake_act()
{
return true;
}
-
-void
-FNET_IOComponent::CleanupHook()
-{
-}
diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h
index 106e31c9236..b88b2700db5 100644
--- a/fnet/src/vespa/fnet/iocomponent.h
+++ b/fnet/src/vespa/fnet/iocomponent.h
@@ -4,6 +4,7 @@
#include "scheduler.h"
#include <vespa/vespalib/net/selector.h>
+#include <vespa/vespalib/util/ref_counted.h>
#include <mutex>
#include <condition_variable>
#include <chrono>
@@ -18,7 +19,7 @@ class FNET_Config;
* Components do IO against the network and that they use sockets to
* perform that IO.
**/
-class FNET_IOComponent
+class FNET_IOComponent : public vespalib::enable_ref_counted
{
friend class FNET_TransportThread;
@@ -46,7 +47,6 @@ protected:
std::string _ioc_spec; // connect/listen spec
Flags _flags; // Compressed representation of boolean flags;
int _ioc_socket_fd; // source of events.
- uint32_t _ioc_refcnt; // reference counter
vespalib::steady_time _ioc_timestamp; // last I/O activity
std::mutex _ioc_lock; // synchronization
std::condition_variable _ioc_cond; // synchronization
@@ -88,43 +88,6 @@ public:
std::unique_lock<std::mutex> getGuard() { return std::unique_lock<std::mutex>(_ioc_lock); }
/**
- * Allocate a reference to this component. This method locks the
- * object to protect the reference counter.
- **/
- void AddRef();
-
-
- /**
- * Allocate a reference to this component without locking the
- * object. Caller already has lock on object.
- **/
- void AddRef_NoLock();
-
-
- /**
- * Free a reference to this component. This method locks the object
- * to protect the reference counter.
- **/
- void SubRef();
-
-
- /**
- * Free a reference to this component. This method uses locking to
- * protect the reference counter, but assumes that the lock has
- * already been obtained when this method is called.
- **/
- void SubRef_HasLock(std::unique_lock<std::mutex> guard);
-
-
- /**
- * Free a reference to this component without locking the
- * object. NOTE: this method may only be called on objects with more
- * than one reference.
- **/
- void SubRef_NoLock();
-
-
- /**
* @return the owning TransportThread object.
**/
FNET_TransportThread *Owner() { return _ioc_owner; }
@@ -217,13 +180,6 @@ public:
**/
virtual bool handle_handshake_act();
- /**
- * This method is called by the SubRef methods just before the
- * object is deleted. It may be used to perform cleanup tasks that
- * must be done before the destructor is invoked.
- **/
- virtual void CleanupHook();
-
/**
* Close this component immediately. NOTE: this method should only
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index 970dc40150f..0b0df02c04c 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -107,7 +107,7 @@ FNET_TransportThread::FlushDeleteList()
FNET_IOComponent *tmp = _deleteList;
_deleteList = tmp->_ioc_next;
assert(tmp->_flags._ioc_delete);
- tmp->SubRef();
+ tmp->internal_subref();
}
}
@@ -143,12 +143,12 @@ FNET_TransportThread::DiscardEvent(FNET_ControlPacket *cpacket,
switch (cpacket->GetCommand()) {
case FNET_ControlPacket::FNET_CMD_IOC_ADD:
context._value.IOC->Close();
- context._value.IOC->SubRef();
+ context._value.IOC->internal_subref();
break;
case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_WRITE:
case FNET_ControlPacket::FNET_CMD_IOC_HANDSHAKE_ACT:
case FNET_ControlPacket::FNET_CMD_IOC_CLOSE:
- context._value.IOC->SubRef();
+ context._value.IOC->internal_subref();
break;
}
}
@@ -173,7 +173,7 @@ FNET_TransportThread::handle_close_cmd(FNET_IOComponent *ioc)
{
if (ioc->_flags._ioc_added) {
RemoveComponent(ioc);
- ioc->SubRef();
+ ioc->internal_subref();
}
ioc->Close();
AddDeleteComponent(ioc);
@@ -288,7 +288,7 @@ FNET_TransportThread::Listen(const char *spec, FNET_IPacketStreamer *streamer,
if (server_socket.valid() && server_socket.set_blocking(false)) {
FNET_Connector *connector = new FNET_Connector(this, streamer, serverAdapter, spec, std::move(server_socket));
connector->EnableReadEvent(true);
- connector->AddRef_NoLock();
+ connector->internal_addref();
Add(connector, /* needRef = */ false);
return connector;
}
@@ -314,7 +314,7 @@ void
FNET_TransportThread::Add(FNET_IOComponent *comp, bool needRef)
{
if (needRef) {
- comp->AddRef();
+ comp->internal_addref();
}
PostEvent(&FNET_ControlPacket::IOCAdd, FNET_Context(comp));
}
@@ -324,7 +324,7 @@ void
FNET_TransportThread::EnableWrite(FNET_IOComponent *comp, bool needRef)
{
if (needRef) {
- comp->AddRef();
+ comp->internal_addref();
}
PostEvent(&FNET_ControlPacket::IOCEnableWrite, FNET_Context(comp));
}
@@ -333,7 +333,7 @@ void
FNET_TransportThread::handshake_act(FNET_IOComponent *comp, bool needRef)
{
if (needRef) {
- comp->AddRef();
+ comp->internal_addref();
}
PostEvent(&FNET_ControlPacket::IOCHandshakeACT, FNET_Context(comp));
}
@@ -342,7 +342,7 @@ void
FNET_TransportThread::Close(FNET_IOComponent *comp, bool needRef)
{
if (needRef) {
- comp->AddRef();
+ comp->internal_addref();
}
PostEvent(&FNET_ControlPacket::IOCClose, FNET_Context(comp));
}
@@ -449,7 +449,7 @@ FNET_TransportThread::handle_wakeup()
}
if (context._value.IOC->_flags._ioc_delete) {
- context._value.IOC->SubRef();
+ context._value.IOC->internal_subref();
continue;
}
@@ -460,14 +460,14 @@ FNET_TransportThread::handle_wakeup()
case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_WRITE:
context._value.IOC->EnableWriteEvent(true);
if (context._value.IOC->HandleWriteEvent()) {
- context._value.IOC->SubRef();
+ context._value.IOC->internal_subref();
} else {
handle_close_cmd(context._value.IOC);
}
break;
case FNET_ControlPacket::FNET_CMD_IOC_HANDSHAKE_ACT:
if (context._value.IOC->handle_handshake_act()) {
- context._value.IOC->SubRef();
+ context._value.IOC->internal_subref();
} else {
handle_close_cmd(context._value.IOC);
}
@@ -577,7 +577,7 @@ FNET_TransportThread::endEventLoop() {
component = component->_ioc_next;
RemoveComponent(tmp);
tmp->Close();
- tmp->SubRef();
+ tmp->internal_subref();
}
assert(_componentsHead == nullptr &&
_componentsTail == nullptr &&
diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h
index 1744a3d60e5..6047d4e3482 100644
--- a/fnet/src/vespa/fnet/transport_thread.h
+++ b/fnet/src/vespa/fnet/transport_thread.h
@@ -97,7 +97,7 @@ private:
/**
- * Delete (call SubRef on) all IO Components in the delete list.
+ * Delete (call internal_subref on) all IO Components in the delete list.
**/
void FlushDeleteList();
@@ -277,7 +277,7 @@ public:
* @param needRef should be set to false if the caller of this
* method already has obtained an extra reference to the
* component. If this flag is true, this method will call the
- * AddRef method on the component.
+ * internal_addref method on the component.
**/
void Add(FNET_IOComponent *comp, bool needRef = true);
@@ -294,7 +294,7 @@ public:
* @param needRef should be set to false if the caller of this
* method already has obtained an extra reference to the
* component. If this flag is true, this method will call the
- * AddRef method on the component.
+ * internal_addref method on the component.
**/
void EnableWrite(FNET_IOComponent *comp, bool needRef = true);
@@ -312,7 +312,7 @@ public:
* @param needRef should be set to false if the caller of this
* method already has obtained an extra reference to the
* component. If this flag is true, this method will call the
- * AddRef method on the component.
+ * internal_addref method on the component.
**/
void handshake_act(FNET_IOComponent *comp, bool needRef = true);
@@ -330,7 +330,7 @@ public:
* @param needRef should be set to false if the caller of this
* method already has obtained an extra reference to the
* component. If this flag is true, this method will call the
- * AddRef method on the component.
+ * internal_addref method on the component.
**/
void Close(FNET_IOComponent *comp, bool needRef = true);