diff options
Diffstat (limited to 'fnet/src')
34 files changed, 159 insertions, 365 deletions
diff --git a/fnet/src/examples/frt/rpc/echo_client.cpp b/fnet/src/examples/frt/rpc/echo_client.cpp index 0176337c466..869a010bfda 100644 --- a/fnet/src/examples/frt/rpc/echo_client.cpp +++ b/fnet/src/examples/frt/rpc/echo_client.cpp @@ -85,8 +85,8 @@ EchoClient::main(int argc, char **argv) } else { printf("Return values != parameters.\n"); } - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); return 0; } diff --git a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp index b41c40ba29d..c52d48f24eb 100644 --- a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp +++ b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp @@ -102,8 +102,8 @@ MyApp::main(int argc, char **argv) ok = false; } - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); return ok ? 0 : 1; } diff --git a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp index 37a819c6ee7..5c21f73da7f 100644 --- a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp +++ b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp @@ -27,7 +27,7 @@ void do_callback(FRT_RPCRequest *req) { cb->GetErrorCode(), cb->GetErrorMessage()); } - cb->SubRef(); + cb->internal_subref(); req->Return(); } diff --git a/fnet/src/examples/frt/rpc/rpc_client.cpp b/fnet/src/examples/frt/rpc/rpc_client.cpp index acb20a880c9..55168a2acba 100644 --- a/fnet/src/examples/frt/rpc/rpc_client.cpp +++ b/fnet/src/examples/frt/rpc/rpc_client.cpp @@ -49,7 +49,7 @@ RPCClient::main(int argc, char **argv) } fprintf(stdout, "\nTesting addFloat method\n"); - req->SubRef(); + req->internal_subref(); req = supervisor.AllocRPCRequest(); req->SetMethodName("addFloat"); req->GetParams()->AddFloat(float1); @@ -65,7 +65,7 @@ RPCClient::main(int argc, char **argv) } fprintf(stdout, "\nTesting addDouble method\n"); - req->SubRef(); + req->internal_subref(); req = supervisor.AllocRPCRequest(); req->SetMethodName("addDouble"); req->GetParams()->AddDouble(double1); @@ -80,8 +80,8 @@ RPCClient::main(int argc, char **argv) req->GetErrorMessage()); } - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); return 0; } diff --git a/fnet/src/examples/frt/rpc/rpc_info.cpp b/fnet/src/examples/frt/rpc/rpc_info.cpp index 9734342a24e..ab534a254c2 100644 --- a/fnet/src/examples/frt/rpc/rpc_info.cpp +++ b/fnet/src/examples/frt/rpc/rpc_info.cpp @@ -15,16 +15,16 @@ public: void GetReq(FRT_RPCRequest **req, FRT_Supervisor *supervisor) { if ((*req) != nullptr) - (*req)->SubRef(); + (*req)->internal_subref(); (*req) = supervisor->AllocRPCRequest(); } void FreeReqs(FRT_RPCRequest *r1, FRT_RPCRequest *r2) { if (r1 != nullptr) - r1->SubRef(); + r1->internal_subref(); if (r2 != nullptr) - r2->SubRef(); + r2->internal_subref(); } void DumpMethodInfo(const char *indent, FRT_RPCRequest *info, @@ -130,7 +130,7 @@ RPCInfo::main(int argc, char **argv) m_list->GetErrorMessage()); } FreeReqs(m_list, info); - target->SubRef(); + target->internal_subref(); return 0; } diff --git a/fnet/src/examples/frt/rpc/rpc_invoke.cpp b/fnet/src/examples/frt/rpc/rpc_invoke.cpp index 9f3e90f469a..d56847098d8 100644 --- a/fnet/src/examples/frt/rpc/rpc_invoke.cpp +++ b/fnet/src/examples/frt/rpc/rpc_invoke.cpp @@ -113,8 +113,8 @@ RPCClient::run(int argc, char **argv) retCode = 3; } } - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); return retCode; } diff --git a/fnet/src/examples/ping/pingclient.cpp b/fnet/src/examples/ping/pingclient.cpp index 43296df7e57..b59df31607a 100644 --- a/fnet/src/examples/ping/pingclient.cpp +++ b/fnet/src/examples/ping/pingclient.cpp @@ -86,7 +86,7 @@ PingClient::main(int argc, char **argv) packet->Free(); } if (conn != nullptr) - conn->SubRef(); + conn->internal_subref(); transport.ShutDown(true); return 0; } diff --git a/fnet/src/examples/ping/pingserver.cpp b/fnet/src/examples/ping/pingserver.cpp index fb5b12b66c0..79a67cd18a7 100644 --- a/fnet/src/examples/ping/pingserver.cpp +++ b/fnet/src/examples/ping/pingserver.cpp @@ -51,7 +51,7 @@ PingServer::main(int argc, char **argv) FNET_Connector *listener = transport.Listen(argv[1], &streamer, this); if (listener != nullptr) - listener->SubRef(); + listener->internal_subref(); FNET_SignalShutDown ssd(transport); transport.Main(); diff --git a/fnet/src/tests/connect/connect_test.cpp b/fnet/src/tests/connect/connect_test.cpp index 9d566cf37a7..d635fea6f94 100644 --- a/fnet/src/tests/connect/connect_test.cpp +++ b/fnet/src/tests/connect/connect_test.cpp @@ -86,25 +86,22 @@ struct BlockingCryptoEngine : public CryptoEngine { //----------------------------------------------------------------------------- -struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler { +struct TransportFixture : FNET_IPacketHandler { FNET_SimplePacketStreamer streamer; FNET_Transport transport; Gate conn_lost; - Gate conn_deleted; - TransportFixture() : streamer(nullptr), transport(), - conn_lost(), conn_deleted() - { + TransportFixture() : streamer(nullptr), transport(), conn_lost() { transport.Start(); } TransportFixture(AsyncResolver::HostResolver::SP host_resolver) : streamer(nullptr), transport(fnet::TransportConfig().resolver(make_resolver(std::move(host_resolver)))), - conn_lost(), conn_deleted() + conn_lost() { transport.Start(); } TransportFixture(CryptoEngine::SP crypto) : streamer(nullptr), transport(fnet::TransportConfig().crypto(std::move(crypto))), - conn_lost(), conn_deleted() + conn_lost() { transport.Start(); } @@ -114,14 +111,12 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler { packet->Free(); return FNET_FREE_CHANNEL; } - void Cleanup(FNET_Connection *) override { conn_deleted.countDown(); } FNET_Connection *connect(const vespalib::string &spec) { FNET_Connection *conn = transport.Connect(spec.c_str(), &streamer); ASSERT_TRUE(conn != nullptr); if (conn->OpenChannel(this, FNET_Context()) == nullptr) { conn_lost.countDown(); } - conn->SetCleanupHandler(this); return conn; } ~TransportFixture() override { @@ -131,8 +126,26 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler { //----------------------------------------------------------------------------- -TEST_MT_FFF("require that normal connect works", 2, - ServerSocket("tcp/0"), TransportFixture(), TimeBomb(60)) +struct ConnCheck { + uint64_t target; + ConnCheck() : target(FNET_Connection::get_num_connections()) { + EXPECT_EQUAL(target, uint64_t(0)); + } + bool at_target() const { return (FNET_Connection::get_num_connections() == target); }; + bool await(duration max_wait) const { + auto until = saturated_add(steady_clock::now(), max_wait); + while (!at_target() && steady_clock::now() < until) { + std::this_thread::sleep_for(1ms); + } + return at_target(); + } + void await() const { + ASSERT_TRUE(await(3600s)); + } +}; + +TEST_MT_FFFF("require that normal connect works", 2, + ServerSocket("tcp/0"), TransportFixture(), ConnCheck(), TimeBomb(60)) { if (thread_id == 0) { SocketHandle socket = f1.accept(); @@ -144,23 +157,23 @@ TEST_MT_FFF("require that normal connect works", 2, TEST_BARRIER(); conn->Owner()->Close(conn); f2.conn_lost.await(); - EXPECT_TRUE(!f2.conn_deleted.await(short_time)); - conn->SubRef(); - f2.conn_deleted.await(); + EXPECT_TRUE(!f3.await(short_time)); + conn->internal_subref(); + f3.await(); } } -TEST_FF("require that bogus connect fail asynchronously", TransportFixture(), TimeBomb(60)) { +TEST_FFF("require that bogus connect fail asynchronously", TransportFixture(), ConnCheck(), TimeBomb(60)) { FNET_Connection *conn = f1.connect("invalid"); f1.conn_lost.await(); - EXPECT_TRUE(!f1.conn_deleted.await(short_time)); - conn->SubRef(); - f1.conn_deleted.await(); + EXPECT_TRUE(!f2.await(short_time)); + conn->internal_subref(); + f2.await(); } -TEST_MT_FFFF("require that async close can be called before async resolve completes", 2, - ServerSocket("tcp/0"), std::shared_ptr<BlockingHostResolver>(new BlockingHostResolver()), - TransportFixture(f2), TimeBomb(60)) +TEST_MT_FFFFF("require that async close can be called before async resolve completes", 2, + ServerSocket("tcp/0"), std::shared_ptr<BlockingHostResolver>(new BlockingHostResolver()), + TransportFixture(f2), ConnCheck(), TimeBomb(60)) { if (thread_id == 0) { SocketHandle socket = f1.accept(); @@ -172,16 +185,16 @@ TEST_MT_FFFF("require that async close can be called before async resolve comple conn->Owner()->Close(conn); f3.conn_lost.await(); f2->release_caller(); - EXPECT_TRUE(!f3.conn_deleted.await(short_time)); - conn->SubRef(); - f3.conn_deleted.await(); + EXPECT_TRUE(!f4.await(short_time)); + conn->internal_subref(); + f4.await(); f1.shutdown(); } } -TEST_MT_FFFF("require that async close during async do_handshake_work works", 2, - ServerSocket("tcp/0"), std::shared_ptr<BlockingCryptoEngine>(new BlockingCryptoEngine()), - TransportFixture(f2), TimeBomb(60)) +TEST_MT_FFFFF("require that async close during async do_handshake_work works", 2, + ServerSocket("tcp/0"), std::shared_ptr<BlockingCryptoEngine>(new BlockingCryptoEngine()), + TransportFixture(f2), ConnCheck(), TimeBomb(60)) { if (thread_id == 0) { SocketHandle socket = f1.accept(); @@ -196,10 +209,10 @@ TEST_MT_FFFF("require that async close during async do_handshake_work works", 2, f3.conn_lost.await(); TEST_BARRIER(); // #1 // verify that pending work keeps relevant objects alive - EXPECT_TRUE(!f3.conn_deleted.await(short_time)); + EXPECT_TRUE(!f4.await(short_time)); EXPECT_TRUE(!f2->handshake_socket_deleted.await(short_time)); f2->handshake_work_exit.countDown(); - f3.conn_deleted.await(); + f4.await(); f2->handshake_socket_deleted.await(); } } diff --git a/fnet/src/tests/connection_spread/connection_spread_test.cpp b/fnet/src/tests/connection_spread/connection_spread_test.cpp index 6286ce65657..5908e6a4982 100644 --- a/fnet/src/tests/connection_spread/connection_spread_test.cpp +++ b/fnet/src/tests/connection_spread/connection_spread_test.cpp @@ -84,9 +84,9 @@ TEST_F("require that connections are spread among transport threads", Fixture) f1.wait_for_components(256, 257); check_threads(f1.client, 8, "client"); check_threads(f1.server, 8, "server"); - listener->SubRef(); + listener->internal_subref(); for (FNET_Connection *conn: connections) { - conn->SubRef(); + conn->internal_subref(); } } diff --git a/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp b/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp index 6325c60413a..c5ca6dc6ce9 100644 --- a/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp +++ b/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp @@ -36,7 +36,7 @@ struct RpcFixture : FRT_Invokable { } ~RpcFixture() { if (back_conn.load() != nullptr) { - back_conn.load()->SubRef(); + back_conn.load()->internal_subref(); } } uint32_t port() const { return orb.GetListenPort(); } @@ -61,7 +61,7 @@ struct RpcFixture : FRT_Invokable { ASSERT_TRUE(back_conn.load() == nullptr); back_conn.store(req->GetConnection()); ASSERT_TRUE(back_conn.load() != nullptr); - back_conn.load()->AddRef(); + back_conn.load()->internal_addref(); } FRT_Target *meta_connect(uint32_t port) { auto *target = orb.Get2WayTarget(fmt("tcp/localhost:%u", port).c_str()); @@ -69,7 +69,7 @@ struct RpcFixture : FRT_Invokable { req->SetMethodName("connect"); target->InvokeSync(req, 300.0); ASSERT_TRUE(req->CheckReturnTypes("")); - req->SubRef(); + req->internal_subref(); return target; }; static int check_result(FRT_RPCRequest *req, uint64_t expect) { @@ -81,7 +81,7 @@ struct RpcFixture : FRT_Invokable { ASSERT_EQUAL(ret, expect); ++num_ok; } - req->SubRef(); + req->internal_subref(); return num_ok; } static int verify_rpc(FNET_Connection *conn) { @@ -101,7 +101,7 @@ struct RpcFixture : FRT_Invokable { int verify_rpc(FRT_Target *target, uint32_t port) { auto *my_target = connect(port); int num_ok = verify_rpc(target) + verify_rpc(my_target) + verify_rpc(back_conn.load()); - my_target->SubRef(); + my_target->internal_subref(); return num_ok; } }; @@ -135,8 +135,8 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic EXPECT_EQUAL(RpcFixture::verify_rpc(target), 0); // outgoing 2way target should be closed EXPECT_EQUAL(RpcFixture::verify_rpc(client_target), 1); // pure client target should not be closed TEST_BARRIER(); // #5 - target->SubRef(); - client_target->SubRef(); + target->internal_subref(); + client_target->internal_subref(); } else if (thread_id == 1) { // server 2 (talks to client 2) auto self = std::make_unique<RpcFixture>(f1); f3 = self->port(); @@ -146,7 +146,7 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic TEST_BARRIER(); // #3 TEST_BARRIER(); // #4 TEST_BARRIER(); // #5 - target->SubRef(); + target->internal_subref(); } else if (thread_id == 2) { // client 1 (talks to server 1) auto self = std::make_unique<RpcFixture>(f1); f4 = self->port(); @@ -165,7 +165,7 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic TEST_BARRIER(); // #4 EXPECT_EQUAL(self->verify_rpc(target, f2), 0); TEST_BARRIER(); // #5 - target->SubRef(); + target->internal_subref(); } else { // client 2 (talks to server 2) ASSERT_EQUAL(thread_id, 3u); auto self = std::make_unique<RpcFixture>(f1); @@ -179,7 +179,7 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic TEST_BARRIER(); // #4 EXPECT_EQUAL(self->verify_rpc(target, f3), 3); TEST_BARRIER(); // #5 - target->SubRef(); + target->internal_subref(); } } diff --git a/fnet/src/tests/frt/method_pt/method_pt.cpp b/fnet/src/tests/frt/method_pt/method_pt.cpp index 2ac706369ae..d6c42ef7790 100644 --- a/fnet/src/tests/frt/method_pt/method_pt.cpp +++ b/fnet/src/tests/frt/method_pt/method_pt.cpp @@ -174,7 +174,7 @@ void finiTest() { delete _complexHandler; delete _mediumHandler; delete _simpleHandler; - _target->SubRef(); + _target->internal_subref(); _server.reset(); } @@ -187,7 +187,7 @@ TEST("method pt") { //-------------------------------- MEDIUM - req->SubRef(); + req->internal_subref(); req = FRT_Supervisor::AllocRPCRequest(); req->SetMethodName("mediumMethod"); _target->InvokeSync(req, 60.0); @@ -195,7 +195,7 @@ TEST("method pt") { //-------------------------------- COMPLEX - req->SubRef(); + req->internal_subref(); req = FRT_Supervisor::AllocRPCRequest(); req->SetMethodName("complexMethod"); _target->InvokeSync(req, 60.0); @@ -213,7 +213,7 @@ TEST("method pt") { fprintf(stderr, "Object inheritance NOT ok for method handlers\n"); } - req->SubRef(); + req->internal_subref(); } //------------------------------------------------------------- diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp index 41bfb7d06a6..74f69541d8b 100644 --- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp +++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp @@ -125,8 +125,8 @@ void perform_test(size_t thread_id, Client &client, Result &result, bool vital = BenchmarkTimer::benchmark(invoke, invoke, 0.5); EXPECT_GREATER_EQUAL(seq, loop_cnt); result.req_per_sec[thread_id] = double(loop_cnt) / t; - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); TEST_BARRIER(); if (thread_id == 0) { result.print(); diff --git a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp index d15fca93c0b..812f0a57c5e 100644 --- a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp +++ b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp @@ -86,7 +86,7 @@ void benchmark_rpc(Fixture &fixture, bool reconnect) { auto invoke = [&seq, &target, &req, &fixture, reconnect](){ TT_Sample sample(req_tag); if (reconnect) { - target->SubRef(); + target->internal_subref(); target = fixture.connect(); } req = fixture.orb.AllocRPCRequest(req); @@ -101,8 +101,8 @@ void benchmark_rpc(Fixture &fixture, bool reconnect) { auto before = TimeTracer::now(); double t = BenchmarkTimer::benchmark(invoke, 5.0); auto after = TimeTracer::now(); - target->SubRef(); - req->SubRef(); + target->internal_subref(); + req->internal_subref(); auto stats = TimeTracer::extract().by_time(before, after).by_tag(req_tag.id()).get(); ASSERT_TRUE(stats.size() > 0); std::sort(stats.begin(), stats.end(), DurationCmp()); diff --git a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp index 17c38ab6e3a..9a0f1778cb6 100644 --- a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp +++ b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp @@ -31,14 +31,14 @@ struct Server : public FRT_Invokable void rpc_hook(FRT_RPCRequest *req) { FNET_Connection *conn = req->GetConnection(); - conn->AddRef(); // need to keep it alive + conn->internal_addref(); // need to keep it alive req->Detach(); req->Return(); // will free request channel FRT_RPCRequest *r = orb.AllocRPCRequest(); r->SetMethodName("frt.rpc.ping"); // might re-use request channel before it is unlinked from hashmap orb.InvokeAsync(orb.GetTransport(), conn, r, 5.0, &receptor); - conn->SubRef(); // invocation will now keep the connection alive as needed + conn->internal_subref(); // invocation will now keep the connection alive as needed } }; @@ -61,11 +61,11 @@ TEST("detach return invoke") { } std::this_thread::sleep_for(10ms); } - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); if (receptor.req.load() != nullptr) { EXPECT_TRUE(!receptor.req.load()->IsError()); - receptor.req.load()->SubRef(); + receptor.req.load()->internal_subref(); } EXPECT_TRUE(receptor.req.load() != nullptr); }; diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp index e930c1252bf..f06c7428c22 100644 --- a/fnet/src/tests/frt/rpc/invoke.cpp +++ b/fnet/src/tests/frt/rpc/invoke.cpp @@ -65,7 +65,7 @@ public: } ~MyReq() { if (_req != nullptr) { - _req->SubRef(); + _req->internal_subref(); } } MyReq(const MyReq &rhs) = delete; @@ -331,7 +331,7 @@ public: } ~Fixture() { - _target->SubRef(); + _target->internal_subref(); } }; @@ -410,7 +410,7 @@ TEST_F("require that a bad target gives connection error", Fixture()) { { FRT_Target *bad_target = f1.make_bad_target(); bad_target->InvokeSync(req.borrow(), timeout); - bad_target->SubRef(); + bad_target->internal_subref(); } EXPECT_EQUAL(req.get().GetErrorCode(), FRTE_RPC_CONNECTION); } diff --git a/fnet/src/tests/frt/rpc/sharedblob.cpp b/fnet/src/tests/frt/rpc/sharedblob.cpp index 2ccb44d03cb..94f57a136a4 100644 --- a/fnet/src/tests/frt/rpc/sharedblob.cpp +++ b/fnet/src/tests/frt/rpc/sharedblob.cpp @@ -122,7 +122,7 @@ struct ServerSampler : public FRT_Invokable dataSet.sample(*req->GetReturn()); // server return before drop // keep request to sample return after drop - req->AddRef(); + req->internal_addref(); serverReq = req; } }; @@ -176,7 +176,7 @@ TEST("testExplicitShared") { req->GetParams()->AddSharedData(&blob); EXPECT_EQUAL(4, blob.refcnt); - req->SubRef(); + req->internal_subref(); EXPECT_EQUAL(1, blob.refcnt); } @@ -262,10 +262,10 @@ TEST("testImplicitShared") { } if (serverSampler.serverReq != 0) { - serverSampler.serverReq->SubRef(); + serverSampler.serverReq->internal_subref(); } - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp index 00075cb75dd..92fe6526a10 100644 --- a/fnet/src/tests/info/info.cpp +++ b/fnet/src/tests/info/info.cpp @@ -63,9 +63,9 @@ TEST("info") { fprintf(stderr, "FD_SETSIZE: %d\n", l[2]._intval32); fprintf(stderr, "sizeof(FRT_RPCRequest): %d\n", l[3]._intval32); - target->SubRef(); - local_info->SubRef(); - remote_info->SubRef(); + target->internal_subref(); + local_info->internal_subref(); + remote_info->internal_subref(); }; TEST("size of important objects") diff --git a/fnet/src/tests/printstuff/printstuff_test.cpp b/fnet/src/tests/printstuff/printstuff_test.cpp index a9621728c5a..bd76ad29405 100644 --- a/fnet/src/tests/printstuff/printstuff_test.cpp +++ b/fnet/src/tests/printstuff/printstuff_test.cpp @@ -37,7 +37,7 @@ TEST("rpc packets in a queue") { q2.QueuePacket(&req->getStash().create<FRT_RPCRequestPacket>(req, 0, false), FNET_Context()); q2.Print(); } - req->SubRef(); + req->internal_subref(); } TEST("info") { diff --git a/fnet/src/tests/transport_debugger/transport_debugger_test.cpp b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp index a363b1df4c2..eaf2fd71bde 100644 --- a/fnet/src/tests/transport_debugger/transport_debugger_test.cpp +++ b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp @@ -113,9 +113,9 @@ TEST_FF("transport layers can be run with transport debugger", Fixture(), vespal EXPECT_EQUAL(req4->GetErrorCode(), FRTE_RPC_TIMEOUT); ASSERT_TRUE(req6->CheckReturnTypes("l")); EXPECT_EQUAL(req6->GetReturn()->GetValue(0)._intval64, 8u); - target->SubRef(); - req4->SubRef(); - req6->SubRef(); + target->internal_subref(); + req4->internal_subref(); + req6->internal_subref(); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp index e344f2a22a6..fef8a6bf01b 100644 --- a/fnet/src/vespa/fnet/connection.cpp +++ b/fnet/src/vespa/fnet/connection.cpp @@ -63,7 +63,7 @@ struct DoHandshakeWork : vespalib::Executor::Task { DoHandshakeWork(FNET_Connection *conn_in, vespalib::CryptoSocket *socket_in) : conn(conn_in), socket(socket_in) { - conn->AddRef(); + conn->internal_addref(); } void run() override { socket->do_handshake_work(); @@ -82,7 +82,7 @@ FNET_Connection::ResolveHandler::ResolveHandler(FNET_Connection *conn) : connection(conn), address() { - connection->AddRef(); + connection->internal_addref(); } void @@ -94,7 +94,7 @@ FNET_Connection::ResolveHandler::handle_result(vespalib::SocketAddress result) FNET_Connection::ResolveHandler::~ResolveHandler() { - connection->SubRef(); + connection->internal_subref(); } @@ -149,10 +149,9 @@ FNET_Connection::SetState(State state) } if ( ! toDelete.empty() ) { - for (const FNET_Channel::UP & ch : toDelete) { - (void) ch; - SubRef_NoLock(); - } + const uint32_t cnt = toDelete.size(); + const uint32_t reserve = 1; + internal_subref(cnt, reserve); } } @@ -185,14 +184,14 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode, _channels.Unregister(channel); if (hp_rc == FNET_IPacketHandler::FNET_FREE_CHANNEL) { - SubRef_NoLock(); + internal_subref(1, 1); toDelete.reset(channel); } } } else if (CanAcceptChannels() && IsFromPeer(chid)) { // open new channel FNET_Channel::UP newChannel(new FNET_Channel(chid, this)); channel = newChannel.get(); - AddRef_NoLock(); + internal_addref(); BeforeCallback(guard, channel); if (_serverAdapter->InitChannel(channel, pcode)) { @@ -203,7 +202,7 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode, AfterCallback(guard); if (hp_rc == FNET_IPacketHandler::FNET_FREE_CHANNEL) { - SubRef_NoLock(); + internal_subref(1, 1); } else if (hp_rc == FNET_IPacketHandler::FNET_KEEP_CHANNEL) { _channels.Register(newChannel.release()); } else { @@ -212,7 +211,7 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode, } else { AfterCallback(guard); - SubRef_NoLock(); + internal_subref(1, 1); guard.unlock(); LOG(debug, "Connection(%s): channel init failed", GetSpec()); @@ -489,8 +488,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner, _myQueue(256), _output(0), _channels(), - _callbackTarget(nullptr), - _cleanup(nullptr) + _callbackTarget(nullptr) { assert(_socket && (_socket->get_fd() >= 0)); _num_connections.fetch_add(1, std::memory_order_relaxed); @@ -520,8 +518,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner, _myQueue(256), _output(0), _channels(), - _callbackTarget(nullptr), - _cleanup(nullptr) + _callbackTarget(nullptr) { _num_connections.fetch_add(1, std::memory_order_relaxed); } @@ -530,7 +527,6 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner, FNET_Connection::~FNET_Connection() { assert(!_resolve_handler); - assert(_cleanup == nullptr); _num_connections.fetch_sub(1, std::memory_order_relaxed); } @@ -576,12 +572,6 @@ FNET_Connection::handle_handshake_act() return ((GetState() == FNET_CONNECTING) && handshake()); } -void -FNET_Connection::SetCleanupHandler(FNET_IConnectionCleanupHandler *handler) -{ - _cleanup = handler; -} - FNET_Channel* FNET_Connection::OpenChannel(FNET_IPacketHandler *handler, @@ -598,7 +588,7 @@ FNET_Connection::OpenChannel(FNET_IPacketHandler *handler, *chid = newChannel->GetID(); } WaitCallback(guard, nullptr); - AddRef_NoLock(); + internal_addref(); ret = newChannel.release(); _channels.Register(ret); } @@ -614,7 +604,7 @@ FNET_Connection::OpenChannel() { std::lock_guard<std::mutex> guard(_ioc_lock); chid = GetNextID(); - AddRef_NoLock(); + internal_addref(); } return new FNET_Channel(chid, this); } @@ -633,18 +623,20 @@ void FNET_Connection::FreeChannel(FNET_Channel *channel) { delete channel; - SubRef_HasLock(std::unique_lock<std::mutex>(_ioc_lock)); + internal_subref(); } void FNET_Connection::CloseAndFreeChannel(FNET_Channel *channel) { - std::unique_lock<std::mutex> guard(_ioc_lock); - WaitCallback(guard, channel); - _channels.Unregister(channel); - SubRef_HasLock(std::move(guard)); - delete channel; + { + std::unique_lock<std::mutex> guard(_ioc_lock); + WaitCallback(guard, channel); + _channels.Unregister(channel); + delete channel; + } + internal_subref(); } @@ -668,7 +660,7 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid) _writeWork++; _queue.QueuePacket_NoLock(packet, FNET_Context(chid)); if ((writeWork == 0) && (GetState() == FNET_CONNECTED)) { - AddRef_NoLock(); + internal_addref(); guard.unlock(); Owner()->EnableWrite(this, /* needRef = */ false); } @@ -686,16 +678,6 @@ FNET_Connection::Sync() void -FNET_Connection::CleanupHook() -{ - if (_cleanup != nullptr) { - _cleanup->Cleanup(this); - _cleanup = nullptr; - } -} - - -void FNET_Connection::Close() { _resolve_handler.reset(); diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h index 10cf74e79de..80927fd375c 100644 --- a/fnet/src/vespa/fnet/connection.h +++ b/fnet/src/vespa/fnet/connection.h @@ -21,30 +21,6 @@ class FNET_IPacketHandler; namespace vespalib::net { class ConnectionAuthContext; } /** - * Interface implemented by objects that want to perform connection - * cleanup. Use the SetCleanupHandler method to register with a - * connection. Currently, there can only be one cleanup handler per - * connection. - **/ -class FNET_IConnectionCleanupHandler -{ -public: - - /** - * Destructor. No cleanup needed for base class. - */ - virtual ~FNET_IConnectionCleanupHandler(void) {} - - /** - * Perform connection cleanup. - * - * @param conn the connection - **/ - virtual void Cleanup(FNET_Connection *conn) = 0; -}; - - -/** * This class represents a single connection with another * computer. The binary format on a connection is defined by the * PacketStreamer given to the constructor. Each connection object may @@ -115,8 +91,6 @@ private: FNET_ChannelLookup _channels; // channel 'DB' FNET_Channel *_callbackTarget; // target of current callback - FNET_IConnectionCleanupHandler *_cleanup; // cleanup handler - std::unique_ptr<vespalib::net::ConnectionAuthContext> _auth_context; static std::atomic<uint64_t> _num_connections; // total number of connections @@ -377,14 +351,6 @@ public: **/ bool handle_handshake_act() override; - /** - * Register a cleanup handler to be invoked when this connection is - * about to be destructed. - * - * @param handler the cleanup handler - **/ - void SetCleanupHandler(FNET_IConnectionCleanupHandler *handler); - /** * Open a new channel on this connection. This method will return @@ -468,14 +434,6 @@ public: /** - * Invoked by the io component superclass before the object is - * destructed. Will invoke the Cleanup method on the cleanup handler - * for this connection, if present. - **/ - void CleanupHook() override; - - - /** * Close this connection immidiately. NOTE: this method should only * be called by the transport thread. **/ diff --git a/fnet/src/vespa/fnet/frt/invoker.cpp b/fnet/src/vespa/fnet/frt/invoker.cpp index d4b35720a8d..421bd0c4d0b 100644 --- a/fnet/src/vespa/fnet/frt/invoker.cpp +++ b/fnet/src/vespa/fnet/frt/invoker.cpp @@ -98,7 +98,7 @@ FRT_RPCInvoker::HandleDone(bool freeChannel) } // send response to client or get rid of it if (_noReply || (_req->GetErrorCode() == FRTE_RPC_BAD_REQUEST)) - _req->SubRef(); + _req->internal_subref(); else ch->Send(_req->CreateReplyPacket()); @@ -128,7 +128,7 @@ void FRT_HookInvoker::Invoke() _req->SetDetachedPT(&detached); (_hook->GetHandler()->*_hook->GetMethod())(_req); assert(!detached); - _req->SubRef(); + _req->internal_subref(); } void diff --git a/fnet/src/vespa/fnet/frt/packets.cpp b/fnet/src/vespa/fnet/frt/packets.cpp index 134a869eafb..5e600c8caa3 100644 --- a/fnet/src/vespa/fnet/frt/packets.cpp +++ b/fnet/src/vespa/fnet/frt/packets.cpp @@ -14,7 +14,7 @@ FRT_RPCPacket::Free() { if (_ownsRef) { _req->DiscardBlobs(); - _req->SubRef(); + _req->internal_subref(); } } diff --git a/fnet/src/vespa/fnet/frt/reflection.cpp b/fnet/src/vespa/fnet/frt/reflection.cpp index af7fa069eb9..c09057ea675 100644 --- a/fnet/src/vespa/fnet/frt/reflection.cpp +++ b/fnet/src/vespa/fnet/frt/reflection.cpp @@ -153,7 +153,7 @@ FRT_ReflectionBuilder::FRT_ReflectionBuilder(FRT_Supervisor *supervisor) FRT_ReflectionBuilder::~FRT_ReflectionBuilder() { Flush(); - _req->SubRef(); + _req->internal_subref(); } diff --git a/fnet/src/vespa/fnet/frt/rpcrequest.cpp b/fnet/src/vespa/fnet/frt/rpcrequest.cpp index ac6dbb26ad6..6870a275ab1 100644 --- a/fnet/src/vespa/fnet/frt/rpcrequest.cpp +++ b/fnet/src/vespa/fnet/frt/rpcrequest.cpp @@ -10,7 +10,6 @@ FRT_RPCRequest::FRT_RPCRequest() _context(), _params(_stash), _return(_stash), - _refcnt(1), _completed(0), _errorCode(FRTE_NO_ERROR), _errorMessageLen(0), @@ -19,14 +18,10 @@ FRT_RPCRequest::FRT_RPCRequest() _methodName(nullptr), _detachedPT(nullptr), _abortHandler(nullptr), - _returnHandler(nullptr), - _cleanupHandler(nullptr) + _returnHandler(nullptr) { } -FRT_RPCRequest::~FRT_RPCRequest() -{ - assert(_refcnt == 0); -} +FRT_RPCRequest::~FRT_RPCRequest() = default; void FRT_RPCRequest::SetError(uint32_t errorCode, const char *errorMessage, uint32_t errorMessageLen) @@ -87,18 +82,9 @@ FRT_RPCRequest::GetConnection() { return _returnHandler->GetConnection(); } -void -FRT_RPCRequest::Cleanup() { - if (_cleanupHandler != nullptr) { - _cleanupHandler->HandleCleanup(); - _cleanupHandler = nullptr; - } -} void FRT_RPCRequest::Reset() { - assert(_refcnt <= 1); - Cleanup(); _context = FNET_Context(); _params.Reset(); _return.Reset(); @@ -118,7 +104,7 @@ FRT_RPCRequest::Reset() { bool FRT_RPCRequest::Recycle() { - if (_refcnt > 1 || _errorCode != FRTE_NO_ERROR) + if (count_refs() > 1 || _errorCode != FRTE_NO_ERROR) return false; Reset(); return true; @@ -126,18 +112,6 @@ FRT_RPCRequest::Recycle() void -FRT_RPCRequest::SubRef() -{ - int oldVal = _refcnt.fetch_sub(1); - assert(oldVal > 0); - if (oldVal == 1) { - Reset(); - delete this; - } -} - - -void FRT_RPCRequest::Print(uint32_t indent) { printf("%*sFRT_RPCRequest {\n", indent, ""); @@ -163,7 +137,7 @@ FRT_RPCRequest::CreateRequestPacket(bool wantReply) flags |= FLAG_FRT_RPC_LITTLE_ENDIAN; if (wantReply) - AddRef(); + internal_addref(); else flags |= FLAG_FRT_RPC_NOREPLY; diff --git a/fnet/src/vespa/fnet/frt/rpcrequest.h b/fnet/src/vespa/fnet/frt/rpcrequest.h index a095c274687..72dae4a6af1 100644 --- a/fnet/src/vespa/fnet/frt/rpcrequest.h +++ b/fnet/src/vespa/fnet/frt/rpcrequest.h @@ -6,6 +6,7 @@ #include "error.h" #include <vespa/fnet/context.h> #include <vespa/vespalib/util/stash.h> +#include <vespa/vespalib/util/ref_counted.h> #include <atomic> class FNET_Packet; @@ -37,20 +38,7 @@ public: }; -class FRT_ICleanupHandler -{ -public: - - /** - * Destructor. No cleanup needed for base class. - */ - virtual ~FRT_ICleanupHandler(void) {} - - virtual void HandleCleanup() = 0; -}; - - -class FRT_RPCRequest +class FRT_RPCRequest : public vespalib::enable_ref_counted { private: using Stash = vespalib::Stash; @@ -58,7 +46,6 @@ private: FNET_Context _context; FRT_Values _params; FRT_Values _return; - std::atomic<int> _refcnt; std::atomic<int> _completed; uint32_t _errorCode; uint32_t _errorMessageLen; @@ -69,7 +56,6 @@ private: bool *_detachedPT; FRT_IAbortHandler *_abortHandler; FRT_IReturnHandler *_returnHandler; - FRT_ICleanupHandler *_cleanupHandler; public: FRT_RPCRequest(const FRT_RPCRequest &) = delete; @@ -86,9 +72,6 @@ public: _return.DiscardBlobs(); } - void AddRef() { _refcnt.fetch_add(1); } - void SubRef(); - void SetContext(FNET_Context context) { _context = context; } FNET_Context GetContext() { return _context; } @@ -137,10 +120,8 @@ public: void SetAbortHandler(FRT_IAbortHandler *handler) { _abortHandler = handler; } void SetReturnHandler(FRT_IReturnHandler *handler) { _returnHandler = handler; } - void SetCleanupHandler(FRT_ICleanupHandler *handler) { _cleanupHandler = handler; } bool Abort(); void Return(); FNET_Connection *GetConnection(); - void Cleanup(); }; diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp index 966c606bf97..7d6b4d727c7 100644 --- a/fnet/src/vespa/fnet/frt/supervisor.cpp +++ b/fnet/src/vespa/fnet/frt/supervisor.cpp @@ -31,7 +31,7 @@ FRT_Supervisor::~FRT_Supervisor() { _transport->detach(this); if (_connector != nullptr) { - _connector->SubRef(); + _connector->internal_subref(); } } @@ -99,7 +99,7 @@ FRT_Supervisor::AllocRPCRequest(FRT_RPCRequest *tradein) if (tradein->Recycle()) { return tradein; } - tradein->SubRef(); + tradein->internal_subref(); } return new FRT_RPCRequest(); } @@ -113,7 +113,7 @@ FRT_Supervisor::InvokeVoid(FNET_Connection *conn, FRT_RPCRequest *req) ch->Send(req->CreateRequestPacket(false)); ch->Free(); } else { - req->SubRef(); + req->internal_subref(); } } diff --git a/fnet/src/vespa/fnet/frt/target.cpp b/fnet/src/vespa/fnet/frt/target.cpp index 1645fba91ee..47baa95b9e2 100644 --- a/fnet/src/vespa/fnet/frt/target.cpp +++ b/fnet/src/vespa/fnet/frt/target.cpp @@ -6,7 +6,6 @@ FRT_Target::~FRT_Target() { - assert(_refcnt == 0); FNET_Connection * conn(_conn); _conn = nullptr; if (conn != nullptr) { diff --git a/fnet/src/vespa/fnet/frt/target.h b/fnet/src/vespa/fnet/frt/target.h index ac6bf377a14..773daaca8b1 100644 --- a/fnet/src/vespa/fnet/frt/target.h +++ b/fnet/src/vespa/fnet/frt/target.h @@ -3,16 +3,16 @@ #pragma once #include <vespa/fnet/connection.h> +#include <vespa/vespalib/util/ref_counted.h> #include <atomic> class FNET_Scheduler; class FRT_RPCRequest; class FRT_IRequestWait; -class FRT_Target +class FRT_Target : public vespalib::enable_ref_counted { private: - std::atomic<int> _refcnt; FNET_Scheduler *_scheduler; FNET_Connection *_conn; @@ -21,23 +21,12 @@ private: public: FRT_Target(FNET_Scheduler *scheduler, FNET_Connection *conn) - : _refcnt(1), - _scheduler(scheduler), + : _scheduler(scheduler), _conn(conn) {} ~FRT_Target(); FNET_Connection *GetConnection() const { return _conn; } - - void AddRef() { _refcnt.fetch_add(1); } - void SubRef() { - if (_refcnt.fetch_sub(1) == 1) { - delete this; - } - } - - int GetRefCnt() const { return _refcnt; } - bool IsValid() { return ((_conn != nullptr) && (_conn->GetState() <= FNET_Connection::FNET_CONNECTED)); diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp index f08718c0c5c..c4fc7d859d4 100644 --- a/fnet/src/vespa/fnet/iocomponent.cpp +++ b/fnet/src/vespa/fnet/iocomponent.cpp @@ -16,7 +16,6 @@ FNET_IOComponent::FNET_IOComponent(FNET_TransportThread *owner, _ioc_spec(spec), _flags(shouldTimeOut), _ioc_socket_fd(socket_fd), - _ioc_refcnt(1), _ioc_timestamp(vespalib::steady_clock::now()), _ioc_lock(), _ioc_cond() @@ -39,58 +38,6 @@ FNET_IOComponent::UpdateTimeOut() { _ioc_owner->UpdateTimeOut(this); } -void -FNET_IOComponent::AddRef() -{ - std::lock_guard<std::mutex> guard(_ioc_lock); - assert(_ioc_refcnt > 0); - _ioc_refcnt++; -} - - -void -FNET_IOComponent::AddRef_NoLock() -{ - assert(_ioc_refcnt > 0); - _ioc_refcnt++; -} - - -void -FNET_IOComponent::SubRef() -{ - { - std::lock_guard<std::mutex> guard(_ioc_lock); - assert(_ioc_refcnt > 0); - if (--_ioc_refcnt > 0) { - return; - } - } - CleanupHook(); - delete this; -} - - -void -FNET_IOComponent::SubRef_HasLock(std::unique_lock<std::mutex> guard) -{ - assert(_ioc_refcnt > 0); - if (--_ioc_refcnt > 0) { - return; - } - guard.unlock(); - CleanupHook(); - delete this; -} - - -void -FNET_IOComponent::SubRef_NoLock() -{ - assert(_ioc_refcnt > 1); - _ioc_refcnt--; -} - void FNET_IOComponent::attach_selector(Selector &selector) @@ -141,8 +88,3 @@ FNET_IOComponent::handle_handshake_act() { return true; } - -void -FNET_IOComponent::CleanupHook() -{ -} diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h index 106e31c9236..b88b2700db5 100644 --- a/fnet/src/vespa/fnet/iocomponent.h +++ b/fnet/src/vespa/fnet/iocomponent.h @@ -4,6 +4,7 @@ #include "scheduler.h" #include <vespa/vespalib/net/selector.h> +#include <vespa/vespalib/util/ref_counted.h> #include <mutex> #include <condition_variable> #include <chrono> @@ -18,7 +19,7 @@ class FNET_Config; * Components do IO against the network and that they use sockets to * perform that IO. **/ -class FNET_IOComponent +class FNET_IOComponent : public vespalib::enable_ref_counted { friend class FNET_TransportThread; @@ -46,7 +47,6 @@ protected: std::string _ioc_spec; // connect/listen spec Flags _flags; // Compressed representation of boolean flags; int _ioc_socket_fd; // source of events. - uint32_t _ioc_refcnt; // reference counter vespalib::steady_time _ioc_timestamp; // last I/O activity std::mutex _ioc_lock; // synchronization std::condition_variable _ioc_cond; // synchronization @@ -88,43 +88,6 @@ public: std::unique_lock<std::mutex> getGuard() { return std::unique_lock<std::mutex>(_ioc_lock); } /** - * Allocate a reference to this component. This method locks the - * object to protect the reference counter. - **/ - void AddRef(); - - - /** - * Allocate a reference to this component without locking the - * object. Caller already has lock on object. - **/ - void AddRef_NoLock(); - - - /** - * Free a reference to this component. This method locks the object - * to protect the reference counter. - **/ - void SubRef(); - - - /** - * Free a reference to this component. This method uses locking to - * protect the reference counter, but assumes that the lock has - * already been obtained when this method is called. - **/ - void SubRef_HasLock(std::unique_lock<std::mutex> guard); - - - /** - * Free a reference to this component without locking the - * object. NOTE: this method may only be called on objects with more - * than one reference. - **/ - void SubRef_NoLock(); - - - /** * @return the owning TransportThread object. **/ FNET_TransportThread *Owner() { return _ioc_owner; } @@ -217,13 +180,6 @@ public: **/ virtual bool handle_handshake_act(); - /** - * This method is called by the SubRef methods just before the - * object is deleted. It may be used to perform cleanup tasks that - * must be done before the destructor is invoked. - **/ - virtual void CleanupHook(); - /** * Close this component immediately. NOTE: this method should only diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index 970dc40150f..0b0df02c04c 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -107,7 +107,7 @@ FNET_TransportThread::FlushDeleteList() FNET_IOComponent *tmp = _deleteList; _deleteList = tmp->_ioc_next; assert(tmp->_flags._ioc_delete); - tmp->SubRef(); + tmp->internal_subref(); } } @@ -143,12 +143,12 @@ FNET_TransportThread::DiscardEvent(FNET_ControlPacket *cpacket, switch (cpacket->GetCommand()) { case FNET_ControlPacket::FNET_CMD_IOC_ADD: context._value.IOC->Close(); - context._value.IOC->SubRef(); + context._value.IOC->internal_subref(); break; case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_WRITE: case FNET_ControlPacket::FNET_CMD_IOC_HANDSHAKE_ACT: case FNET_ControlPacket::FNET_CMD_IOC_CLOSE: - context._value.IOC->SubRef(); + context._value.IOC->internal_subref(); break; } } @@ -173,7 +173,7 @@ FNET_TransportThread::handle_close_cmd(FNET_IOComponent *ioc) { if (ioc->_flags._ioc_added) { RemoveComponent(ioc); - ioc->SubRef(); + ioc->internal_subref(); } ioc->Close(); AddDeleteComponent(ioc); @@ -288,7 +288,7 @@ FNET_TransportThread::Listen(const char *spec, FNET_IPacketStreamer *streamer, if (server_socket.valid() && server_socket.set_blocking(false)) { FNET_Connector *connector = new FNET_Connector(this, streamer, serverAdapter, spec, std::move(server_socket)); connector->EnableReadEvent(true); - connector->AddRef_NoLock(); + connector->internal_addref(); Add(connector, /* needRef = */ false); return connector; } @@ -314,7 +314,7 @@ void FNET_TransportThread::Add(FNET_IOComponent *comp, bool needRef) { if (needRef) { - comp->AddRef(); + comp->internal_addref(); } PostEvent(&FNET_ControlPacket::IOCAdd, FNET_Context(comp)); } @@ -324,7 +324,7 @@ void FNET_TransportThread::EnableWrite(FNET_IOComponent *comp, bool needRef) { if (needRef) { - comp->AddRef(); + comp->internal_addref(); } PostEvent(&FNET_ControlPacket::IOCEnableWrite, FNET_Context(comp)); } @@ -333,7 +333,7 @@ void FNET_TransportThread::handshake_act(FNET_IOComponent *comp, bool needRef) { if (needRef) { - comp->AddRef(); + comp->internal_addref(); } PostEvent(&FNET_ControlPacket::IOCHandshakeACT, FNET_Context(comp)); } @@ -342,7 +342,7 @@ void FNET_TransportThread::Close(FNET_IOComponent *comp, bool needRef) { if (needRef) { - comp->AddRef(); + comp->internal_addref(); } PostEvent(&FNET_ControlPacket::IOCClose, FNET_Context(comp)); } @@ -449,7 +449,7 @@ FNET_TransportThread::handle_wakeup() } if (context._value.IOC->_flags._ioc_delete) { - context._value.IOC->SubRef(); + context._value.IOC->internal_subref(); continue; } @@ -460,14 +460,14 @@ FNET_TransportThread::handle_wakeup() case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_WRITE: context._value.IOC->EnableWriteEvent(true); if (context._value.IOC->HandleWriteEvent()) { - context._value.IOC->SubRef(); + context._value.IOC->internal_subref(); } else { handle_close_cmd(context._value.IOC); } break; case FNET_ControlPacket::FNET_CMD_IOC_HANDSHAKE_ACT: if (context._value.IOC->handle_handshake_act()) { - context._value.IOC->SubRef(); + context._value.IOC->internal_subref(); } else { handle_close_cmd(context._value.IOC); } @@ -577,7 +577,7 @@ FNET_TransportThread::endEventLoop() { component = component->_ioc_next; RemoveComponent(tmp); tmp->Close(); - tmp->SubRef(); + tmp->internal_subref(); } assert(_componentsHead == nullptr && _componentsTail == nullptr && diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index 1744a3d60e5..6047d4e3482 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -97,7 +97,7 @@ private: /** - * Delete (call SubRef on) all IO Components in the delete list. + * Delete (call internal_subref on) all IO Components in the delete list. **/ void FlushDeleteList(); @@ -277,7 +277,7 @@ public: * @param needRef should be set to false if the caller of this * method already has obtained an extra reference to the * component. If this flag is true, this method will call the - * AddRef method on the component. + * internal_addref method on the component. **/ void Add(FNET_IOComponent *comp, bool needRef = true); @@ -294,7 +294,7 @@ public: * @param needRef should be set to false if the caller of this * method already has obtained an extra reference to the * component. If this flag is true, this method will call the - * AddRef method on the component. + * internal_addref method on the component. **/ void EnableWrite(FNET_IOComponent *comp, bool needRef = true); @@ -312,7 +312,7 @@ public: * @param needRef should be set to false if the caller of this * method already has obtained an extra reference to the * component. If this flag is true, this method will call the - * AddRef method on the component. + * internal_addref method on the component. **/ void handshake_act(FNET_IOComponent *comp, bool needRef = true); @@ -330,7 +330,7 @@ public: * @param needRef should be set to false if the caller of this * method already has obtained an extra reference to the * component. If this flag is true, this method will call the - * AddRef method on the component. + * internal_addref method on the component. **/ void Close(FNET_IOComponent *comp, bool needRef = true); |