diff options
author | HÃ¥vard Pettersen <3535158+havardpe@users.noreply.github.com> | 2023-03-07 11:22:14 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-03-07 11:22:14 +0100 |
commit | 028316d562f645540fb27c9be5b8c8b9249d9d11 (patch) | |
tree | 4f96ae0f5bfdc203247b4637fc6c93b6b98b40aa | |
parent | f018f5ac14b97ff8ed6b765de7816e907b9624a5 (diff) | |
parent | 2b836792201441d84404ff3a5cf59b3652ae244e (diff) |
Merge pull request #26320 from vespa-engine/havardpe/use-ref-counted-in-fnet
use ref_counted in fnet
78 files changed, 300 insertions, 491 deletions
diff --git a/config/src/apps/vespa-configproxy-cmd/proxycmd.cpp b/config/src/apps/vespa-configproxy-cmd/proxycmd.cpp index 1d875e0ade2..2bc1c3e94d1 100644 --- a/config/src/apps/vespa-configproxy-cmd/proxycmd.cpp +++ b/config/src/apps/vespa-configproxy-cmd/proxycmd.cpp @@ -39,11 +39,11 @@ void ProxyCmd::invokeRPC() { void ProxyCmd::finiRPC() { if (_req != nullptr) { - _req->SubRef(); + _req->internal_subref(); _req = nullptr; } if (_target != nullptr) { - _target->SubRef(); + _target->internal_subref(); _target = NULL; } _server.reset(); diff --git a/config/src/apps/vespa-get-config/getconfig.cpp b/config/src/apps/vespa-get-config/getconfig.cpp index 03a049044ae..a31caefe054 100644 --- a/config/src/apps/vespa-get-config/getconfig.cpp +++ b/config/src/apps/vespa-get-config/getconfig.cpp @@ -85,7 +85,7 @@ void GetConfig::finiRPC() { if (_target != nullptr) { - _target->SubRef(); + _target->internal_subref(); _target = nullptr; } _server.reset(); diff --git a/config/src/apps/vespa-ping-configproxy/pingproxy.cpp b/config/src/apps/vespa-ping-configproxy/pingproxy.cpp index e8423eba233..83a10416852 100644 --- a/config/src/apps/vespa-ping-configproxy/pingproxy.cpp +++ b/config/src/apps/vespa-ping-configproxy/pingproxy.cpp @@ -59,7 +59,7 @@ void PingProxy::finiRPC() { if (_target != nullptr) { - _target->SubRef(); + _target->internal_subref(); _target = nullptr; } _server.reset(); @@ -146,7 +146,7 @@ PingProxy::main(int argc, char **argv) retval = 1; } } - req->SubRef(); + req->internal_subref(); finiRPC(); return retval; } diff --git a/config/src/tests/frt/frt.cpp b/config/src/tests/frt/frt.cpp index 4e77b289f49..c5098d0e7a1 100644 --- a/config/src/tests/frt/frt.cpp +++ b/config/src/tests/frt/frt.cpp @@ -96,7 +96,7 @@ struct Response { ~RPCFixture() { for (size_t i = 0; i < requests.size(); i++) { - requests[i]->SubRef(); + requests[i]->internal_subref(); } } }; @@ -340,7 +340,7 @@ struct V3RequestFixture { } ~V3RequestFixture() { - req->SubRef(); + req->internal_subref(); } void encodePayload(const char * payload, uint32_t payloadSize, uint32_t uncompressedSize, const CompressionType & compressionType) { diff --git a/config/src/vespa/config/file_acquirer/file_acquirer.cpp b/config/src/vespa/config/file_acquirer/file_acquirer.cpp index 4ee5b4aa4a9..f38875b5727 100644 --- a/config/src/vespa/config/file_acquirer/file_acquirer.cpp +++ b/config/src/vespa/config/file_acquirer/file_acquirer.cpp @@ -31,8 +31,8 @@ RpcFileAcquirer::wait_for(const vespalib::string &file_ref, double timeout_s) LOG(warning, "could not acquire file '%s' (%d: %s)", file_ref.c_str(), req->GetErrorCode(), req->GetErrorMessage()); } - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); return path; } diff --git a/config/src/vespa/config/frt/frtconfigrequest.cpp b/config/src/vespa/config/frt/frtconfigrequest.cpp index 2106da64857..e7da9f53e97 100644 --- a/config/src/vespa/config/frt/frtconfigrequest.cpp +++ b/config/src/vespa/config/frt/frtconfigrequest.cpp @@ -17,7 +17,7 @@ FRTConfigRequest::FRTConfigRequest(Connection * connection, const ConfigKey & ke FRTConfigRequest::~FRTConfigRequest() { - _request->SubRef(); + _request->internal_subref(); } bool diff --git a/config/src/vespa/config/frt/frtconfigresponse.cpp b/config/src/vespa/config/frt/frtconfigresponse.cpp index 43ae3f9b192..28c2825f71c 100644 --- a/config/src/vespa/config/frt/frtconfigresponse.cpp +++ b/config/src/vespa/config/frt/frtconfigresponse.cpp @@ -10,12 +10,12 @@ FRTConfigResponse::FRTConfigResponse(FRT_RPCRequest * request) _responseState(EMPTY), _returnValues(_request->GetReturn()) { - _request->AddRef(); + _request->internal_addref(); } FRTConfigResponse::~FRTConfigResponse() { - _request->SubRef(); + _request->internal_subref(); } bool diff --git a/config/src/vespa/config/frt/frtconnection.cpp b/config/src/vespa/config/frt/frtconnection.cpp index be8ccead65c..ff2a82f855b 100644 --- a/config/src/vespa/config/frt/frtconnection.cpp +++ b/config/src/vespa/config/frt/frtconnection.cpp @@ -30,7 +30,7 @@ FRTConnection::~FRTConnection() { if (_target != nullptr) { LOG(debug, "Shutting down %s", _address.c_str()); - _target->SubRef(); + _target->internal_subref(); _target = nullptr; } } @@ -42,10 +42,10 @@ FRTConnection::getTarget() if (_target == nullptr) { _target = _supervisor.GetTarget(_address.c_str()); } else if ( ! _target->IsValid()) { - _target->SubRef(); + _target->internal_subref(); _target = _supervisor.GetTarget(_address.c_str()); } - _target->AddRef(); + _target->internal_addref(); return _target; } @@ -54,7 +54,7 @@ FRTConnection::invoke(FRT_RPCRequest * req, duration timeout, FRT_IRequestWait * { FRT_Target * target = getTarget(); target->InvokeAsync(req, vespalib::to_s(timeout), waiter); - target->SubRef(); + target->internal_subref(); } void diff --git a/configd/src/apps/cmd/main.cpp b/configd/src/apps/cmd/main.cpp index 3ea26fee150..9ee7130a06e 100644 --- a/configd/src/apps/cmd/main.cpp +++ b/configd/src/apps/cmd/main.cpp @@ -74,7 +74,7 @@ void Cmd::finiRPC() { if (_target != nullptr) { - _target->SubRef(); + _target->internal_subref(); _target = nullptr; } _server.reset(); @@ -150,7 +150,7 @@ Cmd::run(const Method &cmd, const char *arg) } } } - req->SubRef(); + req->internal_subref(); finiRPC(); return retval; } diff --git a/configd/src/apps/sentinel/outward-check.cpp b/configd/src/apps/sentinel/outward-check.cpp index b81c8e23750..b8bb2e69077 100644 --- a/configd/src/apps/sentinel/outward-check.cpp +++ b/configd/src/apps/sentinel/outward-check.cpp @@ -53,9 +53,9 @@ void OutwardCheck::RequestDone(FRT_RPCRequest *req) { req->GetErrorMessage(), req->GetErrorCode()); _result = CcResult::CONN_FAIL; } - _req->SubRef(); + _req->internal_subref(); _req = nullptr; - _target->SubRef(); + _target->internal_subref(); _target = nullptr; _context.latch.countDown(); } diff --git a/configd/src/apps/sentinel/peer-check.cpp b/configd/src/apps/sentinel/peer-check.cpp index 0af5a6fb58f..ac3775d4c4d 100644 --- a/configd/src/apps/sentinel/peer-check.cpp +++ b/configd/src/apps/sentinel/peer-check.cpp @@ -39,9 +39,9 @@ void PeerCheck::RequestDone(FRT_RPCRequest *req) { LOG(debug, "OK ping to %s [port %d]", _hostname.c_str(), _portnum); _statusOk = true; } - _req->SubRef(); + _req->internal_subref(); _req = nullptr; - _target->SubRef(); + _target->internal_subref(); _target = nullptr; // Note: will delete this object, so must be called as final step: _callback.returnStatus(_statusOk); diff --git a/fnet/src/examples/frt/rpc/echo_client.cpp b/fnet/src/examples/frt/rpc/echo_client.cpp index 0176337c466..869a010bfda 100644 --- a/fnet/src/examples/frt/rpc/echo_client.cpp +++ b/fnet/src/examples/frt/rpc/echo_client.cpp @@ -85,8 +85,8 @@ EchoClient::main(int argc, char **argv) } else { printf("Return values != parameters.\n"); } - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); return 0; } diff --git a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp index b41c40ba29d..c52d48f24eb 100644 --- a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp +++ b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp @@ -102,8 +102,8 @@ MyApp::main(int argc, char **argv) ok = false; } - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); return ok ? 0 : 1; } diff --git a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp index 37a819c6ee7..5c21f73da7f 100644 --- a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp +++ b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp @@ -27,7 +27,7 @@ void do_callback(FRT_RPCRequest *req) { cb->GetErrorCode(), cb->GetErrorMessage()); } - cb->SubRef(); + cb->internal_subref(); req->Return(); } diff --git a/fnet/src/examples/frt/rpc/rpc_client.cpp b/fnet/src/examples/frt/rpc/rpc_client.cpp index acb20a880c9..55168a2acba 100644 --- a/fnet/src/examples/frt/rpc/rpc_client.cpp +++ b/fnet/src/examples/frt/rpc/rpc_client.cpp @@ -49,7 +49,7 @@ RPCClient::main(int argc, char **argv) } fprintf(stdout, "\nTesting addFloat method\n"); - req->SubRef(); + req->internal_subref(); req = supervisor.AllocRPCRequest(); req->SetMethodName("addFloat"); req->GetParams()->AddFloat(float1); @@ -65,7 +65,7 @@ RPCClient::main(int argc, char **argv) } fprintf(stdout, "\nTesting addDouble method\n"); - req->SubRef(); + req->internal_subref(); req = supervisor.AllocRPCRequest(); req->SetMethodName("addDouble"); req->GetParams()->AddDouble(double1); @@ -80,8 +80,8 @@ RPCClient::main(int argc, char **argv) req->GetErrorMessage()); } - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); return 0; } diff --git a/fnet/src/examples/frt/rpc/rpc_info.cpp b/fnet/src/examples/frt/rpc/rpc_info.cpp index 9734342a24e..ab534a254c2 100644 --- a/fnet/src/examples/frt/rpc/rpc_info.cpp +++ b/fnet/src/examples/frt/rpc/rpc_info.cpp @@ -15,16 +15,16 @@ public: void GetReq(FRT_RPCRequest **req, FRT_Supervisor *supervisor) { if ((*req) != nullptr) - (*req)->SubRef(); + (*req)->internal_subref(); (*req) = supervisor->AllocRPCRequest(); } void FreeReqs(FRT_RPCRequest *r1, FRT_RPCRequest *r2) { if (r1 != nullptr) - r1->SubRef(); + r1->internal_subref(); if (r2 != nullptr) - r2->SubRef(); + r2->internal_subref(); } void DumpMethodInfo(const char *indent, FRT_RPCRequest *info, @@ -130,7 +130,7 @@ RPCInfo::main(int argc, char **argv) m_list->GetErrorMessage()); } FreeReqs(m_list, info); - target->SubRef(); + target->internal_subref(); return 0; } diff --git a/fnet/src/examples/frt/rpc/rpc_invoke.cpp b/fnet/src/examples/frt/rpc/rpc_invoke.cpp index 9f3e90f469a..d56847098d8 100644 --- a/fnet/src/examples/frt/rpc/rpc_invoke.cpp +++ b/fnet/src/examples/frt/rpc/rpc_invoke.cpp @@ -113,8 +113,8 @@ RPCClient::run(int argc, char **argv) retCode = 3; } } - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); return retCode; } diff --git a/fnet/src/examples/ping/pingclient.cpp b/fnet/src/examples/ping/pingclient.cpp index 43296df7e57..b59df31607a 100644 --- a/fnet/src/examples/ping/pingclient.cpp +++ b/fnet/src/examples/ping/pingclient.cpp @@ -86,7 +86,7 @@ PingClient::main(int argc, char **argv) packet->Free(); } if (conn != nullptr) - conn->SubRef(); + conn->internal_subref(); transport.ShutDown(true); return 0; } diff --git a/fnet/src/examples/ping/pingserver.cpp b/fnet/src/examples/ping/pingserver.cpp index fb5b12b66c0..79a67cd18a7 100644 --- a/fnet/src/examples/ping/pingserver.cpp +++ b/fnet/src/examples/ping/pingserver.cpp @@ -51,7 +51,7 @@ PingServer::main(int argc, char **argv) FNET_Connector *listener = transport.Listen(argv[1], &streamer, this); if (listener != nullptr) - listener->SubRef(); + listener->internal_subref(); FNET_SignalShutDown ssd(transport); transport.Main(); diff --git a/fnet/src/tests/connect/connect_test.cpp b/fnet/src/tests/connect/connect_test.cpp index 9d566cf37a7..d635fea6f94 100644 --- a/fnet/src/tests/connect/connect_test.cpp +++ b/fnet/src/tests/connect/connect_test.cpp @@ -86,25 +86,22 @@ struct BlockingCryptoEngine : public CryptoEngine { //----------------------------------------------------------------------------- -struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler { +struct TransportFixture : FNET_IPacketHandler { FNET_SimplePacketStreamer streamer; FNET_Transport transport; Gate conn_lost; - Gate conn_deleted; - TransportFixture() : streamer(nullptr), transport(), - conn_lost(), conn_deleted() - { + TransportFixture() : streamer(nullptr), transport(), conn_lost() { transport.Start(); } TransportFixture(AsyncResolver::HostResolver::SP host_resolver) : streamer(nullptr), transport(fnet::TransportConfig().resolver(make_resolver(std::move(host_resolver)))), - conn_lost(), conn_deleted() + conn_lost() { transport.Start(); } TransportFixture(CryptoEngine::SP crypto) : streamer(nullptr), transport(fnet::TransportConfig().crypto(std::move(crypto))), - conn_lost(), conn_deleted() + conn_lost() { transport.Start(); } @@ -114,14 +111,12 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler { packet->Free(); return FNET_FREE_CHANNEL; } - void Cleanup(FNET_Connection *) override { conn_deleted.countDown(); } FNET_Connection *connect(const vespalib::string &spec) { FNET_Connection *conn = transport.Connect(spec.c_str(), &streamer); ASSERT_TRUE(conn != nullptr); if (conn->OpenChannel(this, FNET_Context()) == nullptr) { conn_lost.countDown(); } - conn->SetCleanupHandler(this); return conn; } ~TransportFixture() override { @@ -131,8 +126,26 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler { //----------------------------------------------------------------------------- -TEST_MT_FFF("require that normal connect works", 2, - ServerSocket("tcp/0"), TransportFixture(), TimeBomb(60)) +struct ConnCheck { + uint64_t target; + ConnCheck() : target(FNET_Connection::get_num_connections()) { + EXPECT_EQUAL(target, uint64_t(0)); + } + bool at_target() const { return (FNET_Connection::get_num_connections() == target); }; + bool await(duration max_wait) const { + auto until = saturated_add(steady_clock::now(), max_wait); + while (!at_target() && steady_clock::now() < until) { + std::this_thread::sleep_for(1ms); + } + return at_target(); + } + void await() const { + ASSERT_TRUE(await(3600s)); + } +}; + +TEST_MT_FFFF("require that normal connect works", 2, + ServerSocket("tcp/0"), TransportFixture(), ConnCheck(), TimeBomb(60)) { if (thread_id == 0) { SocketHandle socket = f1.accept(); @@ -144,23 +157,23 @@ TEST_MT_FFF("require that normal connect works", 2, TEST_BARRIER(); conn->Owner()->Close(conn); f2.conn_lost.await(); - EXPECT_TRUE(!f2.conn_deleted.await(short_time)); - conn->SubRef(); - f2.conn_deleted.await(); + EXPECT_TRUE(!f3.await(short_time)); + conn->internal_subref(); + f3.await(); } } -TEST_FF("require that bogus connect fail asynchronously", TransportFixture(), TimeBomb(60)) { +TEST_FFF("require that bogus connect fail asynchronously", TransportFixture(), ConnCheck(), TimeBomb(60)) { FNET_Connection *conn = f1.connect("invalid"); f1.conn_lost.await(); - EXPECT_TRUE(!f1.conn_deleted.await(short_time)); - conn->SubRef(); - f1.conn_deleted.await(); + EXPECT_TRUE(!f2.await(short_time)); + conn->internal_subref(); + f2.await(); } -TEST_MT_FFFF("require that async close can be called before async resolve completes", 2, - ServerSocket("tcp/0"), std::shared_ptr<BlockingHostResolver>(new BlockingHostResolver()), - TransportFixture(f2), TimeBomb(60)) +TEST_MT_FFFFF("require that async close can be called before async resolve completes", 2, + ServerSocket("tcp/0"), std::shared_ptr<BlockingHostResolver>(new BlockingHostResolver()), + TransportFixture(f2), ConnCheck(), TimeBomb(60)) { if (thread_id == 0) { SocketHandle socket = f1.accept(); @@ -172,16 +185,16 @@ TEST_MT_FFFF("require that async close can be called before async resolve comple conn->Owner()->Close(conn); f3.conn_lost.await(); f2->release_caller(); - EXPECT_TRUE(!f3.conn_deleted.await(short_time)); - conn->SubRef(); - f3.conn_deleted.await(); + EXPECT_TRUE(!f4.await(short_time)); + conn->internal_subref(); + f4.await(); f1.shutdown(); } } -TEST_MT_FFFF("require that async close during async do_handshake_work works", 2, - ServerSocket("tcp/0"), std::shared_ptr<BlockingCryptoEngine>(new BlockingCryptoEngine()), - TransportFixture(f2), TimeBomb(60)) +TEST_MT_FFFFF("require that async close during async do_handshake_work works", 2, + ServerSocket("tcp/0"), std::shared_ptr<BlockingCryptoEngine>(new BlockingCryptoEngine()), + TransportFixture(f2), ConnCheck(), TimeBomb(60)) { if (thread_id == 0) { SocketHandle socket = f1.accept(); @@ -196,10 +209,10 @@ TEST_MT_FFFF("require that async close during async do_handshake_work works", 2, f3.conn_lost.await(); TEST_BARRIER(); // #1 // verify that pending work keeps relevant objects alive - EXPECT_TRUE(!f3.conn_deleted.await(short_time)); + EXPECT_TRUE(!f4.await(short_time)); EXPECT_TRUE(!f2->handshake_socket_deleted.await(short_time)); f2->handshake_work_exit.countDown(); - f3.conn_deleted.await(); + f4.await(); f2->handshake_socket_deleted.await(); } } diff --git a/fnet/src/tests/connection_spread/connection_spread_test.cpp b/fnet/src/tests/connection_spread/connection_spread_test.cpp index 6286ce65657..5908e6a4982 100644 --- a/fnet/src/tests/connection_spread/connection_spread_test.cpp +++ b/fnet/src/tests/connection_spread/connection_spread_test.cpp @@ -84,9 +84,9 @@ TEST_F("require that connections are spread among transport threads", Fixture) f1.wait_for_components(256, 257); check_threads(f1.client, 8, "client"); check_threads(f1.server, 8, "server"); - listener->SubRef(); + listener->internal_subref(); for (FNET_Connection *conn: connections) { - conn->SubRef(); + conn->internal_subref(); } } diff --git a/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp b/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp index 6325c60413a..c5ca6dc6ce9 100644 --- a/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp +++ b/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp @@ -36,7 +36,7 @@ struct RpcFixture : FRT_Invokable { } ~RpcFixture() { if (back_conn.load() != nullptr) { - back_conn.load()->SubRef(); + back_conn.load()->internal_subref(); } } uint32_t port() const { return orb.GetListenPort(); } @@ -61,7 +61,7 @@ struct RpcFixture : FRT_Invokable { ASSERT_TRUE(back_conn.load() == nullptr); back_conn.store(req->GetConnection()); ASSERT_TRUE(back_conn.load() != nullptr); - back_conn.load()->AddRef(); + back_conn.load()->internal_addref(); } FRT_Target *meta_connect(uint32_t port) { auto *target = orb.Get2WayTarget(fmt("tcp/localhost:%u", port).c_str()); @@ -69,7 +69,7 @@ struct RpcFixture : FRT_Invokable { req->SetMethodName("connect"); target->InvokeSync(req, 300.0); ASSERT_TRUE(req->CheckReturnTypes("")); - req->SubRef(); + req->internal_subref(); return target; }; static int check_result(FRT_RPCRequest *req, uint64_t expect) { @@ -81,7 +81,7 @@ struct RpcFixture : FRT_Invokable { ASSERT_EQUAL(ret, expect); ++num_ok; } - req->SubRef(); + req->internal_subref(); return num_ok; } static int verify_rpc(FNET_Connection *conn) { @@ -101,7 +101,7 @@ struct RpcFixture : FRT_Invokable { int verify_rpc(FRT_Target *target, uint32_t port) { auto *my_target = connect(port); int num_ok = verify_rpc(target) + verify_rpc(my_target) + verify_rpc(back_conn.load()); - my_target->SubRef(); + my_target->internal_subref(); return num_ok; } }; @@ -135,8 +135,8 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic EXPECT_EQUAL(RpcFixture::verify_rpc(target), 0); // outgoing 2way target should be closed EXPECT_EQUAL(RpcFixture::verify_rpc(client_target), 1); // pure client target should not be closed TEST_BARRIER(); // #5 - target->SubRef(); - client_target->SubRef(); + target->internal_subref(); + client_target->internal_subref(); } else if (thread_id == 1) { // server 2 (talks to client 2) auto self = std::make_unique<RpcFixture>(f1); f3 = self->port(); @@ -146,7 +146,7 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic TEST_BARRIER(); // #3 TEST_BARRIER(); // #4 TEST_BARRIER(); // #5 - target->SubRef(); + target->internal_subref(); } else if (thread_id == 2) { // client 1 (talks to server 1) auto self = std::make_unique<RpcFixture>(f1); f4 = self->port(); @@ -165,7 +165,7 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic TEST_BARRIER(); // #4 EXPECT_EQUAL(self->verify_rpc(target, f2), 0); TEST_BARRIER(); // #5 - target->SubRef(); + target->internal_subref(); } else { // client 2 (talks to server 2) ASSERT_EQUAL(thread_id, 3u); auto self = std::make_unique<RpcFixture>(f1); @@ -179,7 +179,7 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic TEST_BARRIER(); // #4 EXPECT_EQUAL(self->verify_rpc(target, f3), 3); TEST_BARRIER(); // #5 - target->SubRef(); + target->internal_subref(); } } diff --git a/fnet/src/tests/frt/method_pt/method_pt.cpp b/fnet/src/tests/frt/method_pt/method_pt.cpp index 2ac706369ae..d6c42ef7790 100644 --- a/fnet/src/tests/frt/method_pt/method_pt.cpp +++ b/fnet/src/tests/frt/method_pt/method_pt.cpp @@ -174,7 +174,7 @@ void finiTest() { delete _complexHandler; delete _mediumHandler; delete _simpleHandler; - _target->SubRef(); + _target->internal_subref(); _server.reset(); } @@ -187,7 +187,7 @@ TEST("method pt") { //-------------------------------- MEDIUM - req->SubRef(); + req->internal_subref(); req = FRT_Supervisor::AllocRPCRequest(); req->SetMethodName("mediumMethod"); _target->InvokeSync(req, 60.0); @@ -195,7 +195,7 @@ TEST("method pt") { //-------------------------------- COMPLEX - req->SubRef(); + req->internal_subref(); req = FRT_Supervisor::AllocRPCRequest(); req->SetMethodName("complexMethod"); _target->InvokeSync(req, 60.0); @@ -213,7 +213,7 @@ TEST("method pt") { fprintf(stderr, "Object inheritance NOT ok for method handlers\n"); } - req->SubRef(); + req->internal_subref(); } //------------------------------------------------------------- diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp index 41bfb7d06a6..74f69541d8b 100644 --- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp +++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp @@ -125,8 +125,8 @@ void perform_test(size_t thread_id, Client &client, Result &result, bool vital = BenchmarkTimer::benchmark(invoke, invoke, 0.5); EXPECT_GREATER_EQUAL(seq, loop_cnt); result.req_per_sec[thread_id] = double(loop_cnt) / t; - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); TEST_BARRIER(); if (thread_id == 0) { result.print(); diff --git a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp index d15fca93c0b..812f0a57c5e 100644 --- a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp +++ b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp @@ -86,7 +86,7 @@ void benchmark_rpc(Fixture &fixture, bool reconnect) { auto invoke = [&seq, &target, &req, &fixture, reconnect](){ TT_Sample sample(req_tag); if (reconnect) { - target->SubRef(); + target->internal_subref(); target = fixture.connect(); } req = fixture.orb.AllocRPCRequest(req); @@ -101,8 +101,8 @@ void benchmark_rpc(Fixture &fixture, bool reconnect) { auto before = TimeTracer::now(); double t = BenchmarkTimer::benchmark(invoke, 5.0); auto after = TimeTracer::now(); - target->SubRef(); - req->SubRef(); + target->internal_subref(); + req->internal_subref(); auto stats = TimeTracer::extract().by_time(before, after).by_tag(req_tag.id()).get(); ASSERT_TRUE(stats.size() > 0); std::sort(stats.begin(), stats.end(), DurationCmp()); diff --git a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp index 17c38ab6e3a..9a0f1778cb6 100644 --- a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp +++ b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp @@ -31,14 +31,14 @@ struct Server : public FRT_Invokable void rpc_hook(FRT_RPCRequest *req) { FNET_Connection *conn = req->GetConnection(); - conn->AddRef(); // need to keep it alive + conn->internal_addref(); // need to keep it alive req->Detach(); req->Return(); // will free request channel FRT_RPCRequest *r = orb.AllocRPCRequest(); r->SetMethodName("frt.rpc.ping"); // might re-use request channel before it is unlinked from hashmap orb.InvokeAsync(orb.GetTransport(), conn, r, 5.0, &receptor); - conn->SubRef(); // invocation will now keep the connection alive as needed + conn->internal_subref(); // invocation will now keep the connection alive as needed } }; @@ -61,11 +61,11 @@ TEST("detach return invoke") { } std::this_thread::sleep_for(10ms); } - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); if (receptor.req.load() != nullptr) { EXPECT_TRUE(!receptor.req.load()->IsError()); - receptor.req.load()->SubRef(); + receptor.req.load()->internal_subref(); } EXPECT_TRUE(receptor.req.load() != nullptr); }; diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp index e930c1252bf..f06c7428c22 100644 --- a/fnet/src/tests/frt/rpc/invoke.cpp +++ b/fnet/src/tests/frt/rpc/invoke.cpp @@ -65,7 +65,7 @@ public: } ~MyReq() { if (_req != nullptr) { - _req->SubRef(); + _req->internal_subref(); } } MyReq(const MyReq &rhs) = delete; @@ -331,7 +331,7 @@ public: } ~Fixture() { - _target->SubRef(); + _target->internal_subref(); } }; @@ -410,7 +410,7 @@ TEST_F("require that a bad target gives connection error", Fixture()) { { FRT_Target *bad_target = f1.make_bad_target(); bad_target->InvokeSync(req.borrow(), timeout); - bad_target->SubRef(); + bad_target->internal_subref(); } EXPECT_EQUAL(req.get().GetErrorCode(), FRTE_RPC_CONNECTION); } diff --git a/fnet/src/tests/frt/rpc/sharedblob.cpp b/fnet/src/tests/frt/rpc/sharedblob.cpp index 2ccb44d03cb..94f57a136a4 100644 --- a/fnet/src/tests/frt/rpc/sharedblob.cpp +++ b/fnet/src/tests/frt/rpc/sharedblob.cpp @@ -122,7 +122,7 @@ struct ServerSampler : public FRT_Invokable dataSet.sample(*req->GetReturn()); // server return before drop // keep request to sample return after drop - req->AddRef(); + req->internal_addref(); serverReq = req; } }; @@ -176,7 +176,7 @@ TEST("testExplicitShared") { req->GetParams()->AddSharedData(&blob); EXPECT_EQUAL(4, blob.refcnt); - req->SubRef(); + req->internal_subref(); EXPECT_EQUAL(1, blob.refcnt); } @@ -262,10 +262,10 @@ TEST("testImplicitShared") { } if (serverSampler.serverReq != 0) { - serverSampler.serverReq->SubRef(); + serverSampler.serverReq->internal_subref(); } - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp index 00075cb75dd..f57f7d78b07 100644 --- a/fnet/src/tests/info/info.cpp +++ b/fnet/src/tests/info/info.cpp @@ -63,9 +63,9 @@ TEST("info") { fprintf(stderr, "FD_SETSIZE: %d\n", l[2]._intval32); fprintf(stderr, "sizeof(FRT_RPCRequest): %d\n", l[3]._intval32); - target->SubRef(); - local_info->SubRef(); - remote_info->SubRef(); + target->internal_subref(); + local_info->internal_subref(); + remote_info->internal_subref(); }; TEST("size of important objects") @@ -77,7 +77,7 @@ TEST("size of important objects") #else constexpr size_t MUTEX_SIZE = 40u; #endif - EXPECT_EQUAL(MUTEX_SIZE + sizeof(std::string) + 112u, sizeof(FNET_IOComponent)); + EXPECT_EQUAL(MUTEX_SIZE + sizeof(std::string) + 120u, sizeof(FNET_IOComponent)); EXPECT_EQUAL(32u, sizeof(FNET_Channel)); EXPECT_EQUAL(40u, sizeof(FNET_PacketQueue_NoLock)); EXPECT_EQUAL(MUTEX_SIZE + sizeof(std::string) + 416u, sizeof(FNET_Connection)); diff --git a/fnet/src/tests/printstuff/printstuff_test.cpp b/fnet/src/tests/printstuff/printstuff_test.cpp index a9621728c5a..bd76ad29405 100644 --- a/fnet/src/tests/printstuff/printstuff_test.cpp +++ b/fnet/src/tests/printstuff/printstuff_test.cpp @@ -37,7 +37,7 @@ TEST("rpc packets in a queue") { q2.QueuePacket(&req->getStash().create<FRT_RPCRequestPacket>(req, 0, false), FNET_Context()); q2.Print(); } - req->SubRef(); + req->internal_subref(); } TEST("info") { diff --git a/fnet/src/tests/transport_debugger/transport_debugger_test.cpp b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp index a363b1df4c2..eaf2fd71bde 100644 --- a/fnet/src/tests/transport_debugger/transport_debugger_test.cpp +++ b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp @@ -113,9 +113,9 @@ TEST_FF("transport layers can be run with transport debugger", Fixture(), vespal EXPECT_EQUAL(req4->GetErrorCode(), FRTE_RPC_TIMEOUT); ASSERT_TRUE(req6->CheckReturnTypes("l")); EXPECT_EQUAL(req6->GetReturn()->GetValue(0)._intval64, 8u); - target->SubRef(); - req4->SubRef(); - req6->SubRef(); + target->internal_subref(); + req4->internal_subref(); + req6->internal_subref(); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp index e344f2a22a6..fef8a6bf01b 100644 --- a/fnet/src/vespa/fnet/connection.cpp +++ b/fnet/src/vespa/fnet/connection.cpp @@ -63,7 +63,7 @@ struct DoHandshakeWork : vespalib::Executor::Task { DoHandshakeWork(FNET_Connection *conn_in, vespalib::CryptoSocket *socket_in) : conn(conn_in), socket(socket_in) { - conn->AddRef(); + conn->internal_addref(); } void run() override { socket->do_handshake_work(); @@ -82,7 +82,7 @@ FNET_Connection::ResolveHandler::ResolveHandler(FNET_Connection *conn) : connection(conn), address() { - connection->AddRef(); + connection->internal_addref(); } void @@ -94,7 +94,7 @@ FNET_Connection::ResolveHandler::handle_result(vespalib::SocketAddress result) FNET_Connection::ResolveHandler::~ResolveHandler() { - connection->SubRef(); + connection->internal_subref(); } @@ -149,10 +149,9 @@ FNET_Connection::SetState(State state) } if ( ! toDelete.empty() ) { - for (const FNET_Channel::UP & ch : toDelete) { - (void) ch; - SubRef_NoLock(); - } + const uint32_t cnt = toDelete.size(); + const uint32_t reserve = 1; + internal_subref(cnt, reserve); } } @@ -185,14 +184,14 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode, _channels.Unregister(channel); if (hp_rc == FNET_IPacketHandler::FNET_FREE_CHANNEL) { - SubRef_NoLock(); + internal_subref(1, 1); toDelete.reset(channel); } } } else if (CanAcceptChannels() && IsFromPeer(chid)) { // open new channel FNET_Channel::UP newChannel(new FNET_Channel(chid, this)); channel = newChannel.get(); - AddRef_NoLock(); + internal_addref(); BeforeCallback(guard, channel); if (_serverAdapter->InitChannel(channel, pcode)) { @@ -203,7 +202,7 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode, AfterCallback(guard); if (hp_rc == FNET_IPacketHandler::FNET_FREE_CHANNEL) { - SubRef_NoLock(); + internal_subref(1, 1); } else if (hp_rc == FNET_IPacketHandler::FNET_KEEP_CHANNEL) { _channels.Register(newChannel.release()); } else { @@ -212,7 +211,7 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode, } else { AfterCallback(guard); - SubRef_NoLock(); + internal_subref(1, 1); guard.unlock(); LOG(debug, "Connection(%s): channel init failed", GetSpec()); @@ -489,8 +488,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner, _myQueue(256), _output(0), _channels(), - _callbackTarget(nullptr), - _cleanup(nullptr) + _callbackTarget(nullptr) { assert(_socket && (_socket->get_fd() >= 0)); _num_connections.fetch_add(1, std::memory_order_relaxed); @@ -520,8 +518,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner, _myQueue(256), _output(0), _channels(), - _callbackTarget(nullptr), - _cleanup(nullptr) + _callbackTarget(nullptr) { _num_connections.fetch_add(1, std::memory_order_relaxed); } @@ -530,7 +527,6 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner, FNET_Connection::~FNET_Connection() { assert(!_resolve_handler); - assert(_cleanup == nullptr); _num_connections.fetch_sub(1, std::memory_order_relaxed); } @@ -576,12 +572,6 @@ FNET_Connection::handle_handshake_act() return ((GetState() == FNET_CONNECTING) && handshake()); } -void -FNET_Connection::SetCleanupHandler(FNET_IConnectionCleanupHandler *handler) -{ - _cleanup = handler; -} - FNET_Channel* FNET_Connection::OpenChannel(FNET_IPacketHandler *handler, @@ -598,7 +588,7 @@ FNET_Connection::OpenChannel(FNET_IPacketHandler *handler, *chid = newChannel->GetID(); } WaitCallback(guard, nullptr); - AddRef_NoLock(); + internal_addref(); ret = newChannel.release(); _channels.Register(ret); } @@ -614,7 +604,7 @@ FNET_Connection::OpenChannel() { std::lock_guard<std::mutex> guard(_ioc_lock); chid = GetNextID(); - AddRef_NoLock(); + internal_addref(); } return new FNET_Channel(chid, this); } @@ -633,18 +623,20 @@ void FNET_Connection::FreeChannel(FNET_Channel *channel) { delete channel; - SubRef_HasLock(std::unique_lock<std::mutex>(_ioc_lock)); + internal_subref(); } void FNET_Connection::CloseAndFreeChannel(FNET_Channel *channel) { - std::unique_lock<std::mutex> guard(_ioc_lock); - WaitCallback(guard, channel); - _channels.Unregister(channel); - SubRef_HasLock(std::move(guard)); - delete channel; + { + std::unique_lock<std::mutex> guard(_ioc_lock); + WaitCallback(guard, channel); + _channels.Unregister(channel); + delete channel; + } + internal_subref(); } @@ -668,7 +660,7 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid) _writeWork++; _queue.QueuePacket_NoLock(packet, FNET_Context(chid)); if ((writeWork == 0) && (GetState() == FNET_CONNECTED)) { - AddRef_NoLock(); + internal_addref(); guard.unlock(); Owner()->EnableWrite(this, /* needRef = */ false); } @@ -686,16 +678,6 @@ FNET_Connection::Sync() void -FNET_Connection::CleanupHook() -{ - if (_cleanup != nullptr) { - _cleanup->Cleanup(this); - _cleanup = nullptr; - } -} - - -void FNET_Connection::Close() { _resolve_handler.reset(); diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h index 10cf74e79de..80927fd375c 100644 --- a/fnet/src/vespa/fnet/connection.h +++ b/fnet/src/vespa/fnet/connection.h @@ -21,30 +21,6 @@ class FNET_IPacketHandler; namespace vespalib::net { class ConnectionAuthContext; } /** - * Interface implemented by objects that want to perform connection - * cleanup. Use the SetCleanupHandler method to register with a - * connection. Currently, there can only be one cleanup handler per - * connection. - **/ -class FNET_IConnectionCleanupHandler -{ -public: - - /** - * Destructor. No cleanup needed for base class. - */ - virtual ~FNET_IConnectionCleanupHandler(void) {} - - /** - * Perform connection cleanup. - * - * @param conn the connection - **/ - virtual void Cleanup(FNET_Connection *conn) = 0; -}; - - -/** * This class represents a single connection with another * computer. The binary format on a connection is defined by the * PacketStreamer given to the constructor. Each connection object may @@ -115,8 +91,6 @@ private: FNET_ChannelLookup _channels; // channel 'DB' FNET_Channel *_callbackTarget; // target of current callback - FNET_IConnectionCleanupHandler *_cleanup; // cleanup handler - std::unique_ptr<vespalib::net::ConnectionAuthContext> _auth_context; static std::atomic<uint64_t> _num_connections; // total number of connections @@ -377,14 +351,6 @@ public: **/ bool handle_handshake_act() override; - /** - * Register a cleanup handler to be invoked when this connection is - * about to be destructed. - * - * @param handler the cleanup handler - **/ - void SetCleanupHandler(FNET_IConnectionCleanupHandler *handler); - /** * Open a new channel on this connection. This method will return @@ -468,14 +434,6 @@ public: /** - * Invoked by the io component superclass before the object is - * destructed. Will invoke the Cleanup method on the cleanup handler - * for this connection, if present. - **/ - void CleanupHook() override; - - - /** * Close this connection immidiately. NOTE: this method should only * be called by the transport thread. **/ diff --git a/fnet/src/vespa/fnet/frt/invoker.cpp b/fnet/src/vespa/fnet/frt/invoker.cpp index d4b35720a8d..421bd0c4d0b 100644 --- a/fnet/src/vespa/fnet/frt/invoker.cpp +++ b/fnet/src/vespa/fnet/frt/invoker.cpp @@ -98,7 +98,7 @@ FRT_RPCInvoker::HandleDone(bool freeChannel) } // send response to client or get rid of it if (_noReply || (_req->GetErrorCode() == FRTE_RPC_BAD_REQUEST)) - _req->SubRef(); + _req->internal_subref(); else ch->Send(_req->CreateReplyPacket()); @@ -128,7 +128,7 @@ void FRT_HookInvoker::Invoke() _req->SetDetachedPT(&detached); (_hook->GetHandler()->*_hook->GetMethod())(_req); assert(!detached); - _req->SubRef(); + _req->internal_subref(); } void diff --git a/fnet/src/vespa/fnet/frt/packets.cpp b/fnet/src/vespa/fnet/frt/packets.cpp index 134a869eafb..5e600c8caa3 100644 --- a/fnet/src/vespa/fnet/frt/packets.cpp +++ b/fnet/src/vespa/fnet/frt/packets.cpp @@ -14,7 +14,7 @@ FRT_RPCPacket::Free() { if (_ownsRef) { _req->DiscardBlobs(); - _req->SubRef(); + _req->internal_subref(); } } diff --git a/fnet/src/vespa/fnet/frt/reflection.cpp b/fnet/src/vespa/fnet/frt/reflection.cpp index af7fa069eb9..c09057ea675 100644 --- a/fnet/src/vespa/fnet/frt/reflection.cpp +++ b/fnet/src/vespa/fnet/frt/reflection.cpp @@ -153,7 +153,7 @@ FRT_ReflectionBuilder::FRT_ReflectionBuilder(FRT_Supervisor *supervisor) FRT_ReflectionBuilder::~FRT_ReflectionBuilder() { Flush(); - _req->SubRef(); + _req->internal_subref(); } diff --git a/fnet/src/vespa/fnet/frt/rpcrequest.cpp b/fnet/src/vespa/fnet/frt/rpcrequest.cpp index ac6dbb26ad6..6870a275ab1 100644 --- a/fnet/src/vespa/fnet/frt/rpcrequest.cpp +++ b/fnet/src/vespa/fnet/frt/rpcrequest.cpp @@ -10,7 +10,6 @@ FRT_RPCRequest::FRT_RPCRequest() _context(), _params(_stash), _return(_stash), - _refcnt(1), _completed(0), _errorCode(FRTE_NO_ERROR), _errorMessageLen(0), @@ -19,14 +18,10 @@ FRT_RPCRequest::FRT_RPCRequest() _methodName(nullptr), _detachedPT(nullptr), _abortHandler(nullptr), - _returnHandler(nullptr), - _cleanupHandler(nullptr) + _returnHandler(nullptr) { } -FRT_RPCRequest::~FRT_RPCRequest() -{ - assert(_refcnt == 0); -} +FRT_RPCRequest::~FRT_RPCRequest() = default; void FRT_RPCRequest::SetError(uint32_t errorCode, const char *errorMessage, uint32_t errorMessageLen) @@ -87,18 +82,9 @@ FRT_RPCRequest::GetConnection() { return _returnHandler->GetConnection(); } -void -FRT_RPCRequest::Cleanup() { - if (_cleanupHandler != nullptr) { - _cleanupHandler->HandleCleanup(); - _cleanupHandler = nullptr; - } -} void FRT_RPCRequest::Reset() { - assert(_refcnt <= 1); - Cleanup(); _context = FNET_Context(); _params.Reset(); _return.Reset(); @@ -118,7 +104,7 @@ FRT_RPCRequest::Reset() { bool FRT_RPCRequest::Recycle() { - if (_refcnt > 1 || _errorCode != FRTE_NO_ERROR) + if (count_refs() > 1 || _errorCode != FRTE_NO_ERROR) return false; Reset(); return true; @@ -126,18 +112,6 @@ FRT_RPCRequest::Recycle() void -FRT_RPCRequest::SubRef() -{ - int oldVal = _refcnt.fetch_sub(1); - assert(oldVal > 0); - if (oldVal == 1) { - Reset(); - delete this; - } -} - - -void FRT_RPCRequest::Print(uint32_t indent) { printf("%*sFRT_RPCRequest {\n", indent, ""); @@ -163,7 +137,7 @@ FRT_RPCRequest::CreateRequestPacket(bool wantReply) flags |= FLAG_FRT_RPC_LITTLE_ENDIAN; if (wantReply) - AddRef(); + internal_addref(); else flags |= FLAG_FRT_RPC_NOREPLY; diff --git a/fnet/src/vespa/fnet/frt/rpcrequest.h b/fnet/src/vespa/fnet/frt/rpcrequest.h index a095c274687..72dae4a6af1 100644 --- a/fnet/src/vespa/fnet/frt/rpcrequest.h +++ b/fnet/src/vespa/fnet/frt/rpcrequest.h @@ -6,6 +6,7 @@ #include "error.h" #include <vespa/fnet/context.h> #include <vespa/vespalib/util/stash.h> +#include <vespa/vespalib/util/ref_counted.h> #include <atomic> class FNET_Packet; @@ -37,20 +38,7 @@ public: }; -class FRT_ICleanupHandler -{ -public: - - /** - * Destructor. No cleanup needed for base class. - */ - virtual ~FRT_ICleanupHandler(void) {} - - virtual void HandleCleanup() = 0; -}; - - -class FRT_RPCRequest +class FRT_RPCRequest : public vespalib::enable_ref_counted { private: using Stash = vespalib::Stash; @@ -58,7 +46,6 @@ private: FNET_Context _context; FRT_Values _params; FRT_Values _return; - std::atomic<int> _refcnt; std::atomic<int> _completed; uint32_t _errorCode; uint32_t _errorMessageLen; @@ -69,7 +56,6 @@ private: bool *_detachedPT; FRT_IAbortHandler *_abortHandler; FRT_IReturnHandler *_returnHandler; - FRT_ICleanupHandler *_cleanupHandler; public: FRT_RPCRequest(const FRT_RPCRequest &) = delete; @@ -86,9 +72,6 @@ public: _return.DiscardBlobs(); } - void AddRef() { _refcnt.fetch_add(1); } - void SubRef(); - void SetContext(FNET_Context context) { _context = context; } FNET_Context GetContext() { return _context; } @@ -137,10 +120,8 @@ public: void SetAbortHandler(FRT_IAbortHandler *handler) { _abortHandler = handler; } void SetReturnHandler(FRT_IReturnHandler *handler) { _returnHandler = handler; } - void SetCleanupHandler(FRT_ICleanupHandler *handler) { _cleanupHandler = handler; } bool Abort(); void Return(); FNET_Connection *GetConnection(); - void Cleanup(); }; diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp index 966c606bf97..7d6b4d727c7 100644 --- a/fnet/src/vespa/fnet/frt/supervisor.cpp +++ b/fnet/src/vespa/fnet/frt/supervisor.cpp @@ -31,7 +31,7 @@ FRT_Supervisor::~FRT_Supervisor() { _transport->detach(this); if (_connector != nullptr) { - _connector->SubRef(); + _connector->internal_subref(); } } @@ -99,7 +99,7 @@ FRT_Supervisor::AllocRPCRequest(FRT_RPCRequest *tradein) if (tradein->Recycle()) { return tradein; } - tradein->SubRef(); + tradein->internal_subref(); } return new FRT_RPCRequest(); } @@ -113,7 +113,7 @@ FRT_Supervisor::InvokeVoid(FNET_Connection *conn, FRT_RPCRequest *req) ch->Send(req->CreateRequestPacket(false)); ch->Free(); } else { - req->SubRef(); + req->internal_subref(); } } diff --git a/fnet/src/vespa/fnet/frt/target.cpp b/fnet/src/vespa/fnet/frt/target.cpp index 1645fba91ee..47baa95b9e2 100644 --- a/fnet/src/vespa/fnet/frt/target.cpp +++ b/fnet/src/vespa/fnet/frt/target.cpp @@ -6,7 +6,6 @@ FRT_Target::~FRT_Target() { - assert(_refcnt == 0); FNET_Connection * conn(_conn); _conn = nullptr; if (conn != nullptr) { diff --git a/fnet/src/vespa/fnet/frt/target.h b/fnet/src/vespa/fnet/frt/target.h index ac6bf377a14..773daaca8b1 100644 --- a/fnet/src/vespa/fnet/frt/target.h +++ b/fnet/src/vespa/fnet/frt/target.h @@ -3,16 +3,16 @@ #pragma once #include <vespa/fnet/connection.h> +#include <vespa/vespalib/util/ref_counted.h> #include <atomic> class FNET_Scheduler; class FRT_RPCRequest; class FRT_IRequestWait; -class FRT_Target +class FRT_Target : public vespalib::enable_ref_counted { private: - std::atomic<int> _refcnt; FNET_Scheduler *_scheduler; FNET_Connection *_conn; @@ -21,23 +21,12 @@ private: public: FRT_Target(FNET_Scheduler *scheduler, FNET_Connection *conn) - : _refcnt(1), - _scheduler(scheduler), + : _scheduler(scheduler), _conn(conn) {} ~FRT_Target(); FNET_Connection *GetConnection() const { return _conn; } - - void AddRef() { _refcnt.fetch_add(1); } - void SubRef() { - if (_refcnt.fetch_sub(1) == 1) { - delete this; - } - } - - int GetRefCnt() const { return _refcnt; } - bool IsValid() { return ((_conn != nullptr) && (_conn->GetState() <= FNET_Connection::FNET_CONNECTED)); diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp index f08718c0c5c..c4fc7d859d4 100644 --- a/fnet/src/vespa/fnet/iocomponent.cpp +++ b/fnet/src/vespa/fnet/iocomponent.cpp @@ -16,7 +16,6 @@ FNET_IOComponent::FNET_IOComponent(FNET_TransportThread *owner, _ioc_spec(spec), _flags(shouldTimeOut), _ioc_socket_fd(socket_fd), - _ioc_refcnt(1), _ioc_timestamp(vespalib::steady_clock::now()), _ioc_lock(), _ioc_cond() @@ -39,58 +38,6 @@ FNET_IOComponent::UpdateTimeOut() { _ioc_owner->UpdateTimeOut(this); } -void -FNET_IOComponent::AddRef() -{ - std::lock_guard<std::mutex> guard(_ioc_lock); - assert(_ioc_refcnt > 0); - _ioc_refcnt++; -} - - -void -FNET_IOComponent::AddRef_NoLock() -{ - assert(_ioc_refcnt > 0); - _ioc_refcnt++; -} - - -void -FNET_IOComponent::SubRef() -{ - { - std::lock_guard<std::mutex> guard(_ioc_lock); - assert(_ioc_refcnt > 0); - if (--_ioc_refcnt > 0) { - return; - } - } - CleanupHook(); - delete this; -} - - -void -FNET_IOComponent::SubRef_HasLock(std::unique_lock<std::mutex> guard) -{ - assert(_ioc_refcnt > 0); - if (--_ioc_refcnt > 0) { - return; - } - guard.unlock(); - CleanupHook(); - delete this; -} - - -void -FNET_IOComponent::SubRef_NoLock() -{ - assert(_ioc_refcnt > 1); - _ioc_refcnt--; -} - void FNET_IOComponent::attach_selector(Selector &selector) @@ -141,8 +88,3 @@ FNET_IOComponent::handle_handshake_act() { return true; } - -void -FNET_IOComponent::CleanupHook() -{ -} diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h index 106e31c9236..b88b2700db5 100644 --- a/fnet/src/vespa/fnet/iocomponent.h +++ b/fnet/src/vespa/fnet/iocomponent.h @@ -4,6 +4,7 @@ #include "scheduler.h" #include <vespa/vespalib/net/selector.h> +#include <vespa/vespalib/util/ref_counted.h> #include <mutex> #include <condition_variable> #include <chrono> @@ -18,7 +19,7 @@ class FNET_Config; * Components do IO against the network and that they use sockets to * perform that IO. **/ -class FNET_IOComponent +class FNET_IOComponent : public vespalib::enable_ref_counted { friend class FNET_TransportThread; @@ -46,7 +47,6 @@ protected: std::string _ioc_spec; // connect/listen spec Flags _flags; // Compressed representation of boolean flags; int _ioc_socket_fd; // source of events. - uint32_t _ioc_refcnt; // reference counter vespalib::steady_time _ioc_timestamp; // last I/O activity std::mutex _ioc_lock; // synchronization std::condition_variable _ioc_cond; // synchronization @@ -88,43 +88,6 @@ public: std::unique_lock<std::mutex> getGuard() { return std::unique_lock<std::mutex>(_ioc_lock); } /** - * Allocate a reference to this component. This method locks the - * object to protect the reference counter. - **/ - void AddRef(); - - - /** - * Allocate a reference to this component without locking the - * object. Caller already has lock on object. - **/ - void AddRef_NoLock(); - - - /** - * Free a reference to this component. This method locks the object - * to protect the reference counter. - **/ - void SubRef(); - - - /** - * Free a reference to this component. This method uses locking to - * protect the reference counter, but assumes that the lock has - * already been obtained when this method is called. - **/ - void SubRef_HasLock(std::unique_lock<std::mutex> guard); - - - /** - * Free a reference to this component without locking the - * object. NOTE: this method may only be called on objects with more - * than one reference. - **/ - void SubRef_NoLock(); - - - /** * @return the owning TransportThread object. **/ FNET_TransportThread *Owner() { return _ioc_owner; } @@ -217,13 +180,6 @@ public: **/ virtual bool handle_handshake_act(); - /** - * This method is called by the SubRef methods just before the - * object is deleted. It may be used to perform cleanup tasks that - * must be done before the destructor is invoked. - **/ - virtual void CleanupHook(); - /** * Close this component immediately. NOTE: this method should only diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index 970dc40150f..0b0df02c04c 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -107,7 +107,7 @@ FNET_TransportThread::FlushDeleteList() FNET_IOComponent *tmp = _deleteList; _deleteList = tmp->_ioc_next; assert(tmp->_flags._ioc_delete); - tmp->SubRef(); + tmp->internal_subref(); } } @@ -143,12 +143,12 @@ FNET_TransportThread::DiscardEvent(FNET_ControlPacket *cpacket, switch (cpacket->GetCommand()) { case FNET_ControlPacket::FNET_CMD_IOC_ADD: context._value.IOC->Close(); - context._value.IOC->SubRef(); + context._value.IOC->internal_subref(); break; case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_WRITE: case FNET_ControlPacket::FNET_CMD_IOC_HANDSHAKE_ACT: case FNET_ControlPacket::FNET_CMD_IOC_CLOSE: - context._value.IOC->SubRef(); + context._value.IOC->internal_subref(); break; } } @@ -173,7 +173,7 @@ FNET_TransportThread::handle_close_cmd(FNET_IOComponent *ioc) { if (ioc->_flags._ioc_added) { RemoveComponent(ioc); - ioc->SubRef(); + ioc->internal_subref(); } ioc->Close(); AddDeleteComponent(ioc); @@ -288,7 +288,7 @@ FNET_TransportThread::Listen(const char *spec, FNET_IPacketStreamer *streamer, if (server_socket.valid() && server_socket.set_blocking(false)) { FNET_Connector *connector = new FNET_Connector(this, streamer, serverAdapter, spec, std::move(server_socket)); connector->EnableReadEvent(true); - connector->AddRef_NoLock(); + connector->internal_addref(); Add(connector, /* needRef = */ false); return connector; } @@ -314,7 +314,7 @@ void FNET_TransportThread::Add(FNET_IOComponent *comp, bool needRef) { if (needRef) { - comp->AddRef(); + comp->internal_addref(); } PostEvent(&FNET_ControlPacket::IOCAdd, FNET_Context(comp)); } @@ -324,7 +324,7 @@ void FNET_TransportThread::EnableWrite(FNET_IOComponent *comp, bool needRef) { if (needRef) { - comp->AddRef(); + comp->internal_addref(); } PostEvent(&FNET_ControlPacket::IOCEnableWrite, FNET_Context(comp)); } @@ -333,7 +333,7 @@ void FNET_TransportThread::handshake_act(FNET_IOComponent *comp, bool needRef) { if (needRef) { - comp->AddRef(); + comp->internal_addref(); } PostEvent(&FNET_ControlPacket::IOCHandshakeACT, FNET_Context(comp)); } @@ -342,7 +342,7 @@ void FNET_TransportThread::Close(FNET_IOComponent *comp, bool needRef) { if (needRef) { - comp->AddRef(); + comp->internal_addref(); } PostEvent(&FNET_ControlPacket::IOCClose, FNET_Context(comp)); } @@ -449,7 +449,7 @@ FNET_TransportThread::handle_wakeup() } if (context._value.IOC->_flags._ioc_delete) { - context._value.IOC->SubRef(); + context._value.IOC->internal_subref(); continue; } @@ -460,14 +460,14 @@ FNET_TransportThread::handle_wakeup() case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_WRITE: context._value.IOC->EnableWriteEvent(true); if (context._value.IOC->HandleWriteEvent()) { - context._value.IOC->SubRef(); + context._value.IOC->internal_subref(); } else { handle_close_cmd(context._value.IOC); } break; case FNET_ControlPacket::FNET_CMD_IOC_HANDSHAKE_ACT: if (context._value.IOC->handle_handshake_act()) { - context._value.IOC->SubRef(); + context._value.IOC->internal_subref(); } else { handle_close_cmd(context._value.IOC); } @@ -577,7 +577,7 @@ FNET_TransportThread::endEventLoop() { component = component->_ioc_next; RemoveComponent(tmp); tmp->Close(); - tmp->SubRef(); + tmp->internal_subref(); } assert(_componentsHead == nullptr && _componentsTail == nullptr && diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index 1744a3d60e5..6047d4e3482 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -97,7 +97,7 @@ private: /** - * Delete (call SubRef on) all IO Components in the delete list. + * Delete (call internal_subref on) all IO Components in the delete list. **/ void FlushDeleteList(); @@ -277,7 +277,7 @@ public: * @param needRef should be set to false if the caller of this * method already has obtained an extra reference to the * component. If this flag is true, this method will call the - * AddRef method on the component. + * internal_addref method on the component. **/ void Add(FNET_IOComponent *comp, bool needRef = true); @@ -294,7 +294,7 @@ public: * @param needRef should be set to false if the caller of this * method already has obtained an extra reference to the * component. If this flag is true, this method will call the - * AddRef method on the component. + * internal_addref method on the component. **/ void EnableWrite(FNET_IOComponent *comp, bool needRef = true); @@ -312,7 +312,7 @@ public: * @param needRef should be set to false if the caller of this * method already has obtained an extra reference to the * component. If this flag is true, this method will call the - * AddRef method on the component. + * internal_addref method on the component. **/ void handshake_act(FNET_IOComponent *comp, bool needRef = true); @@ -330,7 +330,7 @@ public: * @param needRef should be set to false if the caller of this * method already has obtained an extra reference to the * component. If this flag is true, this method will call the - * AddRef method on the component. + * internal_addref method on the component. **/ void Close(FNET_IOComponent *comp, bool needRef = true); diff --git a/jrt_test/src/tests/echo/echo-client.cpp b/jrt_test/src/tests/echo/echo-client.cpp index a7c8d309114..16a4a2877b2 100644 --- a/jrt_test/src/tests/echo/echo-client.cpp +++ b/jrt_test/src/tests/echo/echo-client.cpp @@ -79,8 +79,8 @@ public: } else { printf("Return values != parameters.\n"); } - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); return 0; } }; diff --git a/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp b/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp index cdcf02da465..fae7bc90abb 100644 --- a/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp +++ b/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp @@ -14,16 +14,16 @@ public: void GetReq(FRT_RPCRequest **req, FRT_Supervisor *supervisor) { if ((*req) != nullptr) - (*req)->SubRef(); + (*req)->internal_subref(); (*req) = supervisor->AllocRPCRequest(); } void FreeReqs(FRT_RPCRequest *r1, FRT_RPCRequest *r2) { if (r1 != nullptr) - r1->SubRef(); + r1->internal_subref(); if (r2 != nullptr) - r2->SubRef(); + r2->internal_subref(); } void DumpMethodInfo(const char *indent, FRT_RPCRequest *info, @@ -90,7 +90,7 @@ public: break; } std::this_thread::sleep_for(1s); - target->SubRef(); + target->internal_subref(); target = supervisor.GetTarget(argv[1]); } if (info->IsError()) { @@ -133,7 +133,7 @@ public: m_list->GetErrorMessage()); } FreeReqs(m_list, info); - target->SubRef(); + target->internal_subref(); return 0; } }; diff --git a/jrt_test/src/tests/rpc-error/test-errors.cpp b/jrt_test/src/tests/rpc-error/test-errors.cpp index e64c2abfff6..1683d1e11f8 100644 --- a/jrt_test/src/tests/rpc-error/test-errors.cpp +++ b/jrt_test/src/tests/rpc-error/test-errors.cpp @@ -20,7 +20,7 @@ public: } void fini() { - target->SubRef(); + target->internal_subref(); target = nullptr; client = nullptr; } @@ -51,7 +51,7 @@ TestErrors::testNoError() } else { EXPECT_TRUE(false); } - req1->SubRef(); + req1->internal_subref(); } @@ -64,7 +64,7 @@ TestErrors::testNoSuchMethod() EXPECT_TRUE(req1->IsError()); EXPECT_TRUE(0 == req1->GetReturn()->GetNumValues()); EXPECT_TRUE(FRTE_RPC_NO_SUCH_METHOD == req1->GetErrorCode()); - req1->SubRef(); + req1->internal_subref(); } @@ -80,7 +80,7 @@ TestErrors::testWrongParameters() EXPECT_TRUE(req1->IsError()); EXPECT_TRUE(0 == req1->GetReturn()->GetNumValues()); EXPECT_TRUE(FRTE_RPC_WRONG_PARAMS == req1->GetErrorCode()); - req1->SubRef(); + req1->internal_subref(); FRT_RPCRequest *req2 = client->AllocRPCRequest(); req2->SetMethodName("test"); @@ -90,7 +90,7 @@ TestErrors::testWrongParameters() EXPECT_TRUE(req2->IsError()); EXPECT_TRUE(0 == req2->GetReturn()->GetNumValues()); EXPECT_TRUE(FRTE_RPC_WRONG_PARAMS == req2->GetErrorCode()); - req2->SubRef(); + req2->internal_subref(); FRT_RPCRequest *req3 = client->AllocRPCRequest(); req3->SetMethodName("test"); @@ -102,7 +102,7 @@ TestErrors::testWrongParameters() EXPECT_TRUE(req3->IsError()); EXPECT_TRUE(0 == req3->GetReturn()->GetNumValues()); EXPECT_TRUE(FRTE_RPC_WRONG_PARAMS == req3->GetErrorCode()); - req3->SubRef(); + req3->internal_subref(); } @@ -118,7 +118,7 @@ TestErrors::testWrongReturnValues() EXPECT_TRUE(req1->IsError()); EXPECT_TRUE(0 == req1->GetReturn()->GetNumValues()); EXPECT_TRUE(FRTE_RPC_WRONG_RETURN == req1->GetErrorCode()); - req1->SubRef(); + req1->internal_subref(); } @@ -134,7 +134,7 @@ TestErrors::testMethodFailed() EXPECT_TRUE(req1->IsError()); EXPECT_TRUE(0 == req1->GetReturn()->GetNumValues()); EXPECT_TRUE(75000 == req1->GetErrorCode()); - req1->SubRef(); + req1->internal_subref(); FRT_RPCRequest *req2 = client->AllocRPCRequest(); req2->SetMethodName("test"); @@ -145,7 +145,7 @@ TestErrors::testMethodFailed() EXPECT_TRUE(req2->IsError()); EXPECT_TRUE(0 == req2->GetReturn()->GetNumValues()); EXPECT_TRUE(75000 == req2->GetErrorCode()); - req2->SubRef(); + req2->internal_subref(); } diff --git a/logd/src/logd/rpc_forwarder.cpp b/logd/src/logd/rpc_forwarder.cpp index fbd39afeba4..b89eb87e2f7 100644 --- a/logd/src/logd/rpc_forwarder.cpp +++ b/logd/src/logd/rpc_forwarder.cpp @@ -31,7 +31,7 @@ public: : _request(new FRT_RPCRequest()) {} ~GuardedRequest() { - _request->SubRef(); + _request->internal_subref(); } FRT_RPCRequest& operator*() const { return *_request; } FRT_RPCRequest* get() const { return _request; } diff --git a/logd/src/logd/rpc_forwarder.h b/logd/src/logd/rpc_forwarder.h index 75d41259eda..864c7b666cb 100644 --- a/logd/src/logd/rpc_forwarder.h +++ b/logd/src/logd/rpc_forwarder.h @@ -17,7 +17,7 @@ struct Metrics; struct RpcTargetSubRef { void operator()(FRT_Target* target) const noexcept { - target->SubRef(); + target->internal_subref(); } }; using RpcTargetGuard = std::unique_ptr<FRT_Target, RpcTargetSubRef>; diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index 8c67424d5f2..e12313af53c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -93,7 +93,7 @@ RPCSend::handleDiscard(Context ctx) ReplyContext::UP tmp(static_cast<ReplyContext*>(ctx.value.PTR)); FRT_RPCRequest &req = tmp->getRequest(); FNET_Channel *chn = req.GetContext()._value.CHANNEL; - req.SubRef(); + req.internal_subref(); chn->Free(); } @@ -189,7 +189,7 @@ RPCSend::doRequestDone(FRT_RPCRequest *req) { reply->addError(error); } _net->getOwner().deliverReply(std::move(reply), ctx->getRecipient()); - req->SubRef(); + req->internal_subref(); } std::unique_ptr<Reply> diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.cpp b/messagebus/src/vespa/messagebus/network/rpctarget.cpp index 656ab081652..9c6ca9dff69 100644 --- a/messagebus/src/vespa/messagebus/network/rpctarget.cpp +++ b/messagebus/src/vespa/messagebus/network/rpctarget.cpp @@ -19,7 +19,7 @@ RPCTarget::RPCTarget(const string &spec, FRT_Supervisor &orb) : RPCTarget::~RPCTarget() { - _target.SubRef(); + _target.internal_subref(); } void @@ -94,7 +94,7 @@ RPCTarget::RequestDone(FRT_RPCRequest *req) _state = (_version.get() ? VERSION_RESOLVED : VERSION_NOT_RESOLVED); } _cond.notify_all(); - req->SubRef(); + req->internal_subref(); } } // namespace mbus diff --git a/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp b/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp index 8eb56d218fb..c88e4a283b5 100644 --- a/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp +++ b/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp @@ -72,11 +72,11 @@ public: void finiRPC() { if (_req != nullptr) { - _req->SubRef(); + _req->internal_subref(); _req = nullptr; } if (_target != nullptr) { - _target->SubRef(); + _target->internal_subref(); _target = nullptr; } if (_frt) { diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp index 606188f72c2..4c13ff7d762 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp @@ -56,7 +56,7 @@ BmClusterController::propagate_cluster_state(uint32_t node_idx, bool distributor auto target = target_resolver->resolve_rpc_target(storage_address, fake_bucket_id); target->get()->InvokeSync(req, 10.0); // 10 seconds timeout assert(!req->IsError()); - req->SubRef(); + req->internal_subref(); } void diff --git a/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp b/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp index 9c5e000c314..09fb0981a24 100644 --- a/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp +++ b/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp @@ -124,8 +124,8 @@ TEST_F(ProtoRpcAdapterTest, require_that_plain_rpc_ping_works) { req->SetMethodName("frt.rpc.ping"); target->InvokeSync(req, 60.0); EXPECT_TRUE(req->CheckReturnTypes("")); - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); } TEST_F(ProtoRpcAdapterTest, require_that_proto_rpc_search_works) { @@ -145,9 +145,9 @@ TEST_F(ProtoRpcAdapterTest, require_that_proto_rpc_search_works) { EXPECT_EQ(std::string(rpc->GetErrorMessage()), std::string("Server not online")); adapter.set_online(); } - rpc->SubRef(); + rpc->internal_subref(); } - target->SubRef(); + target->internal_subref(); SearchProtocolMetrics &metrics = adapter.metrics(); EXPECT_EQ(metrics.query().latency.getCount(), 2); EXPECT_GT(metrics.query().latency.getTotal(), 0.0); @@ -180,9 +180,9 @@ TEST_F(ProtoRpcAdapterTest, require_that_proto_rpc_getDocsums_works) { EXPECT_EQ(std::string(rpc->GetErrorMessage()), std::string("Server not online")); adapter.set_online(); } - rpc->SubRef(); + rpc->internal_subref(); } - target->SubRef(); + target->internal_subref(); SearchProtocolMetrics &metrics = adapter.metrics(); EXPECT_EQ(metrics.query().latency.getCount(), 0); EXPECT_EQ(metrics.docsum().latency.getCount(), 2); @@ -208,9 +208,9 @@ TEST_F(ProtoRpcAdapterTest, require_that_proto_rpc_ping_works) { EXPECT_EQ(std::string(rpc->GetErrorMessage()), std::string("Server not online")); adapter.set_online(); } - rpc->SubRef(); + rpc->internal_subref(); } - target->SubRef(); + target->internal_subref(); SearchProtocolMetrics &metrics = adapter.metrics(); EXPECT_EQ(metrics.query().latency.getCount(), 0); EXPECT_EQ(metrics.docsum().latency.getCount(), 0); diff --git a/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp b/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp index c44edfcec86..f5cc3c0b247 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp @@ -54,7 +54,7 @@ Session::commit(const vespalib::ConstBufferRef & buf) int retcode = _tlc.rpc(req); retval = (retcode == 0); if (retval) { - req->SubRef(); + req->internal_subref(); } else { vespalib::string msg; if (req->GetReturn() != nullptr) { @@ -62,7 +62,7 @@ Session::commit(const vespalib::ConstBufferRef & buf) } else { msg = vespalib::make_string("Clientside error %s: error(%d): %s", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage()); } - req->SubRef(); + req->internal_subref(); throw std::runtime_error(vespalib::make_string("commit failed with code %d. server says: %s", retcode, msg.c_str())); } } @@ -81,7 +81,7 @@ Session::status(SerialNum & b, SerialNum & e, size_t & count) e = req->GetReturn()->GetValue(2)._intval64; count = req->GetReturn()->GetValue(3)._intval64; } - req->SubRef(); + req->internal_subref(); return (retval == 0); } @@ -93,7 +93,7 @@ Session::erase(const SerialNum & to) req->GetParams()->AddString(_domain.c_str()); req->GetParams()->AddInt64(to); int32_t retval(_tlc.rpc(req)); - req->SubRef(); + req->internal_subref(); if (retval == 1) { LOG(warning, "Prune to %" PRIu64 " denied since there were active visitors in that area", to); } @@ -113,7 +113,7 @@ Session::sync(const SerialNum &syncTo, SerialNum &syncedTo) if (retval == 0) { syncedTo = req->GetReturn()->GetValue(1)._intval64; } - req->SubRef(); + req->internal_subref(); return (retval == 0); } @@ -138,7 +138,7 @@ bool Session::init(FRT_RPCRequest *req) { int32_t retval(_tlc.rpc(req)); - req->SubRef(); + req->internal_subref(); if (retval > 0) { clear(); _sessionId = retval; @@ -171,7 +171,7 @@ Session::run() req->GetParams()->AddString(_domain.c_str()); req->GetParams()->AddInt32(_sessionId); int32_t retval(_tlc.rpc(req)); - req->SubRef(); + req->internal_subref(); return (retval == 0); } @@ -188,7 +188,7 @@ Session::close() if ( (retval = _tlc.rpc(req)) > 0) { std::this_thread::sleep_for(10ms); } - req->SubRef(); + req->internal_subref(); } while ( retval == 1 ); } return (retval == 0); diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp index 17f06b189c6..8916a4cf0b5 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp @@ -80,7 +80,7 @@ void TransLogClient::disconnect() { if (_target) { - _target->SubRef(); + _target->internal_subref(); } } @@ -91,7 +91,7 @@ TransLogClient::create(const vespalib::string & domain) req->SetMethodName("createDomain"); req->GetParams()->AddString(domain.c_str()); int32_t retval(rpc(req)); - req->SubRef(); + req->internal_subref(); return (retval == 0); } @@ -102,7 +102,7 @@ TransLogClient::remove(const vespalib::string & domain) req->SetMethodName("deleteDomain"); req->GetParams()->AddString(domain.c_str()); int32_t retval(rpc(req)); - req->SubRef(); + req->internal_subref(); return (retval == 0); } @@ -113,7 +113,7 @@ TransLogClient::open(const vespalib::string & domain) req->SetMethodName("openDomain"); req->GetParams()->AddString(domain.c_str()); int32_t retval(rpc(req)); - req->SubRef(); + req->internal_subref(); if (retval == 0) { return std::make_unique<Session>(domain, *this); } @@ -138,7 +138,7 @@ TransLogClient::listDomains(std::vector<vespalib::string> & dir) dir.push_back(d); } } - req->SubRef(); + req->internal_subref(); return (retval == 0); } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index c96b0cdcd61..db02f4f037e 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -361,9 +361,9 @@ public: RPCDestination(FRT_Supervisor & supervisor, FNET_Connection * connection) : _supervisor(supervisor), _connection(connection), _ok(true) { - _connection->AddRef(); + _connection->internal_addref(); } - ~RPCDestination() override { _connection->SubRef(); } + ~RPCDestination() override { _connection->internal_subref(); } bool ok() const override { return _ok; @@ -395,7 +395,7 @@ private: if ( ! ((retval == client::RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) { LOG(error, "Return value != OK(%d) in send for method 'visitCallback'.", retval); } - req->SubRef(); + req->internal_subref(); return (retval == client::RPC::OK); } diff --git a/slobrok/src/apps/check_slobrok/check_slobrok.cpp b/slobrok/src/apps/check_slobrok/check_slobrok.cpp index 1b69588a9fc..4e10c9ba6fe 100644 --- a/slobrok/src/apps/check_slobrok/check_slobrok.cpp +++ b/slobrok/src/apps/check_slobrok/check_slobrok.cpp @@ -55,7 +55,7 @@ void Slobrok_Checker::finiRPC() { if (_target != nullptr) { - _target->SubRef(); + _target->internal_subref(); _target = nullptr; } if (_server) { diff --git a/slobrok/src/apps/sbcmd/sbcmd.cpp b/slobrok/src/apps/sbcmd/sbcmd.cpp index 64c29cd92a9..51c8b81aa77 100644 --- a/slobrok/src/apps/sbcmd/sbcmd.cpp +++ b/slobrok/src/apps/sbcmd/sbcmd.cpp @@ -65,7 +65,7 @@ void Slobrok_CMD::finiRPC() { if (_target != nullptr) { - _target->SubRef(); + _target->internal_subref(); _target = nullptr; } if (_server) { @@ -181,7 +181,7 @@ Slobrok_CMD::main(int argc, char **argv) } } } - req->SubRef(); + req->internal_subref(); finiRPC(); return 0; } diff --git a/slobrok/src/tests/mirrorapi/mirrorapi.cpp b/slobrok/src/tests/mirrorapi/mirrorapi.cpp index 2e03ba52bd4..6dac6b22b05 100644 --- a/slobrok/src/tests/mirrorapi/mirrorapi.cpp +++ b/slobrok/src/tests/mirrorapi/mirrorapi.cpp @@ -66,8 +66,8 @@ Server::reg() FRT_Target *sb = _server.supervisor().GetTarget(_slobrokSpec.c_str()); sb->InvokeSync(req, 5.0); - sb->SubRef(); - req->SubRef(); + sb->internal_subref(); + req->internal_subref(); } diff --git a/slobrok/src/tests/rpc_mapping_monitor/rpc_mapping_monitor_test.cpp b/slobrok/src/tests/rpc_mapping_monitor/rpc_mapping_monitor_test.cpp index 13db95eb35c..0493a43adf3 100644 --- a/slobrok/src/tests/rpc_mapping_monitor/rpc_mapping_monitor_test.cpp +++ b/slobrok/src/tests/rpc_mapping_monitor/rpc_mapping_monitor_test.cpp @@ -21,11 +21,11 @@ struct Server : FRT_Invokable { FNET_Connection *last_conn; void set_last_conn(FNET_Connection *conn) { if (last_conn) { - last_conn->SubRef(); + last_conn->internal_subref(); } last_conn = conn; if (last_conn) { - last_conn->AddRef(); + last_conn->internal_addref(); } } Server(fnet::TimeTools::SP time_tools) @@ -210,7 +210,7 @@ TEST_F(RpcMappingMonitorTest, up_connection_is_reused) { a.last_conn = nullptr; EXPECT_TRUE(debugger.step_until([&]() { return (a.last_conn); })); EXPECT_EQ(a.last_conn, my_conn); - my_conn->SubRef(); + my_conn->internal_subref(); EXPECT_EQ(hist.map[foo_a].state(), State::UP); } diff --git a/slobrok/src/tests/standalone/standalone.cpp b/slobrok/src/tests/standalone/standalone.cpp index 33813fceb3a..653e4c64b0e 100644 --- a/slobrok/src/tests/standalone/standalone.cpp +++ b/slobrok/src/tests/standalone/standalone.cpp @@ -77,7 +77,7 @@ private: public: SubReferer(T* &t) : _t(t) {} ~SubReferer() { - if (_t != nullptr) _t->SubRef(); + if (_t != nullptr) _t->internal_subref(); } }; @@ -134,7 +134,7 @@ TEST("standalone") { } fprintf(stderr, "ping failed [retry %d]\n", retry); std::this_thread::sleep_for(200ms); - sb->SubRef(); + sb->internal_subref(); sb = orb.GetTarget(18541); } ASSERT_TRUE(checkOk(req)); diff --git a/slobrok/src/tests/startsome/rpc_info.cpp b/slobrok/src/tests/startsome/rpc_info.cpp index a37cfa24889..be6d59f6a81 100644 --- a/slobrok/src/tests/startsome/rpc_info.cpp +++ b/slobrok/src/tests/startsome/rpc_info.cpp @@ -13,16 +13,16 @@ public: void GetReq(FRT_RPCRequest **req, FRT_Supervisor *supervisor) { if ((*req) != nullptr) - (*req)->SubRef(); + (*req)->internal_subref(); (*req) = supervisor->AllocRPCRequest(); } void FreeReqs(FRT_RPCRequest *r1, FRT_RPCRequest *r2) { if (r1 != nullptr) - r1->SubRef(); + r1->internal_subref(); if (r2 != nullptr) - r2->SubRef(); + r2->internal_subref(); } void DumpMethodInfo(const char *indent, FRT_RPCRequest *info, @@ -123,7 +123,7 @@ public: m_list->GetErrorMessage()); } FreeReqs(m_list, info); - target->SubRef(); + target->internal_subref(); return 0; } }; diff --git a/slobrok/src/vespa/slobrok/sbmirror.cpp b/slobrok/src/vespa/slobrok/sbmirror.cpp index 62d288d40dc..3936b4dac4c 100644 --- a/slobrok/src/vespa/slobrok/sbmirror.cpp +++ b/slobrok/src/vespa/slobrok/sbmirror.cpp @@ -44,10 +44,10 @@ MirrorAPI::~MirrorAPI() _configurator.reset(0); if (_req != 0) { _req->Abort(); - _req->SubRef(); + _req->internal_subref(); } if (_target != 0) { - _target->SubRef(); + _target->internal_subref(); } } @@ -208,7 +208,7 @@ MirrorAPI::handleReconfig() std::string cps = _slobrokSpecs.logString(); LOG(warning, "current server %s not in list of location brokers: %s", _currSlobrok.c_str(), cps.c_str()); - _target->SubRef(); + _target->internal_subref(); _target = 0; } } @@ -224,7 +224,7 @@ MirrorAPI::handleReqDone() if (reconn) { if (_target != 0) { - _target->SubRef(); + _target->internal_subref(); } _target = 0; } else { diff --git a/slobrok/src/vespa/slobrok/sbregister.cpp b/slobrok/src/vespa/slobrok/sbregister.cpp index 925d4ea62bc..e7db255c5d6 100644 --- a/slobrok/src/vespa/slobrok/sbregister.cpp +++ b/slobrok/src/vespa/slobrok/sbregister.cpp @@ -90,10 +90,10 @@ RegisterAPI::~RegisterAPI() _configurator.reset(0); if (_req != 0) { _req->Abort(); - _req->SubRef(); + _req->internal_subref(); } if (_target != 0) { - _target->SubRef(); + _target->internal_subref(); } } @@ -139,7 +139,7 @@ RegisterAPI::handleReqDone() // unexpected error; close our connection to this // slobrok server and try again with a fresh slate if (_target != 0) { - _target->SubRef(); + _target->internal_subref(); } _target = 0; _busy.store(true, std::memory_order_relaxed); @@ -159,7 +159,7 @@ RegisterAPI::handleReqDone() // reset backoff strategy on any successful request _backOff.reset(); } - _req->SubRef(); + _req->internal_subref(); _req = 0; } } @@ -173,7 +173,7 @@ RegisterAPI::handleReconnect() vespalib::string cps = _slobrokSpecs.logString(); LOG(warning, "[RPC @ %s] location broker %s removed, will disconnect and use one of: %s", createSpec(_orb).c_str(), _currSlobrok.c_str(), cps.c_str()); - _target->SubRef(); + _target->internal_subref(); _target = 0; } } diff --git a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp index 4b8c1c02252..94def0271c8 100644 --- a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp +++ b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp @@ -160,7 +160,7 @@ ExchangeManager::WorkPackage::WorkItem::RequestDone(FRT_RPCRequest *req) LOG(warning, "error doing workitem: %s", req->GetErrorMessage()); // XXX tell remslob? } - req->SubRef(); + req->internal_subref(); _pendingReq = nullptr; _pkg.doneItem(denied); } diff --git a/slobrok/src/vespa/slobrok/server/managed_rpc_server.cpp b/slobrok/src/vespa/slobrok/server/managed_rpc_server.cpp index 12761c0e6e9..a7013f6e04e 100644 --- a/slobrok/src/vespa/slobrok/server/managed_rpc_server.cpp +++ b/slobrok/src/vespa/slobrok/server/managed_rpc_server.cpp @@ -59,7 +59,7 @@ ManagedRpcServer::cleanupMonitor() { _monitor.disable(); if (_monitoredServer != nullptr) { - _monitoredServer->SubRef(); + _monitoredServer->internal_subref(); _monitoredServer = nullptr; } if (_checkServerReq != nullptr) { @@ -100,7 +100,7 @@ ManagedRpcServer::RequestDone(FRT_RPCRequest *req) if (req->GetErrorCode() == FRTE_RPC_ABORT) { LOG(debug, "rpcserver[%s].check aborted", getName().c_str()); - req->SubRef(); + req->internal_subref(); _checkServerReq = nullptr; return; } @@ -119,7 +119,7 @@ ManagedRpcServer::RequestDone(FRT_RPCRequest *req) } else { errmsg = "checkServer failed validation"; } - req->SubRef(); + req->internal_subref(); _checkServerReq = nullptr; cleanupMonitor(); _mmanager.notifyFailedRpcSrv(this, errmsg); @@ -130,7 +130,7 @@ ManagedRpcServer::RequestDone(FRT_RPCRequest *req) LOG_ASSERT(_monitoredServer != nullptr); _monitor.enable(_monitoredServer); - req->SubRef(); + req->internal_subref(); _checkServerReq = nullptr; _mmanager.notifyOkRpcSrv(this); } diff --git a/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp b/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp index aa891ae0048..0f6fffb7f4f 100644 --- a/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp +++ b/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp @@ -32,7 +32,7 @@ void RemoteSlobrok::shutdown() { _reconnecter.disable(); if (_remote != nullptr) { - _remote->SubRef(); + _remote->internal_subref(); _remote = nullptr; } @@ -110,7 +110,7 @@ void RemoteSlobrok::handleFetchResult() { _serviceMapMirror.clear(); success = false; } - _remFetchReq->SubRef(); + _remFetchReq->internal_subref(); _remFetchReq = nullptr; if (success) { maybeStartFetch(); @@ -132,12 +132,12 @@ RemoteSlobrok::RequestDone(FRT_RPCRequest *req) const char *myspec = args[1]._string._str; LOG(info, "addPeer(%s, %s) on remote slobrok %s at %s: %s", myname, myspec, getName().c_str(), getSpec().c_str(), req->GetErrorMessage()); - req->SubRef(); + req->internal_subref(); _remAddPeerReq = nullptr; fail(); return; } - req->SubRef(); + req->internal_subref(); _remAddPeerReq = nullptr; } else { LOG(error, "got unknown request back in RequestDone()"); @@ -166,7 +166,7 @@ RemoteSlobrok::fail() { // disconnect if (_remote != nullptr) { - _remote->SubRef(); + _remote->internal_subref(); _remote = nullptr; } // schedule reconnect attempt diff --git a/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp b/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp index bac1ab34574..ad410eb93e8 100644 --- a/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp +++ b/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp @@ -67,7 +67,7 @@ struct FixtureBase { // instance _before_ we destroy the request itself. dispatcher._enqueued.clear(); if (bound_request) { - bound_request->SubRef(); + bound_request->internal_subref(); } } }; diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp index 9a98a40e7eb..bfc22b9f1ea 100644 --- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp +++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp @@ -204,8 +204,8 @@ public: EXPECT_TRUE(req->IsError()); EXPECT_EQ(req->GetErrorCode(), FRTE_RPC_METHOD_FAILED); EXPECT_EQ(req->GetErrorMessage(), expected_msg); - target->SubRef(); - req->SubRef(); + target->internal_subref(); + req->internal_subref(); } }; diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp index 3f015d91a4a..5e4cb9d3026 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp @@ -35,7 +35,7 @@ public: _spec(spec) {} ~RpcTargetImpl() override { - _target->SubRef(); + _target->internal_subref(); } FRT_Target* get() noexcept override { return _target; } bool is_valid() const noexcept override { return _target->IsValid(); } diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp index bcb5dbab279..e494f4e67da 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp @@ -83,7 +83,7 @@ namespace { struct SubRefDeleter { template <typename T> void operator()(T* v) const noexcept { - v->SubRef(); + v->internal_subref(); } }; diff --git a/storage/src/vespa/storage/tools/storage-cmd.cpp b/storage/src/vespa/storage/tools/storage-cmd.cpp index 12299c7458e..bc932fcf6fd 100644 --- a/storage/src/vespa/storage/tools/storage-cmd.cpp +++ b/storage/src/vespa/storage/tools/storage-cmd.cpp @@ -90,7 +90,7 @@ public: req->GetErrorMessage()); continue; } - req->SubRef(); + req->internal_subref(); } FRT_RPCRequest *req = supervisor.supervisor().AllocRPCRequest(); @@ -115,8 +115,8 @@ public: req->GetErrorMessage()); } } - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); } return retCode; } diff --git a/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp b/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp index f70ffcc2655..50311e772e2 100644 --- a/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp +++ b/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp @@ -501,8 +501,8 @@ Application::isService(FRT_Supervisor &frt, const std::string &spec) const } } - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); return ret; } diff --git a/vespalib/src/tests/ref_counted/ref_counted_test.cpp b/vespalib/src/tests/ref_counted/ref_counted_test.cpp index d831e664374..2a5321b7b27 100644 --- a/vespalib/src/tests/ref_counted/ref_counted_test.cpp +++ b/vespalib/src/tests/ref_counted/ref_counted_test.cpp @@ -118,6 +118,7 @@ TEST(RefCountedTest, use_internal_api) { EXPECT_EQ(raw->count_refs(), 1); ref_counted<Base> ref = ref_counted<Base>::internal_attach(raw); EXPECT_EQ(ref->count_refs(), 1); + EXPECT_EQ(ref->val, 20); EXPECT_EQ(ref.internal_detach(), raw); EXPECT_EQ(raw->count_refs(), 1); raw->internal_addref(); @@ -127,6 +128,18 @@ TEST(RefCountedTest, use_internal_api) { raw->internal_subref(); } +TEST(RefCountedTest, use_multi_ref_internal_api) { + CheckObjects check(1); + Base *raw = new Base(20); + EXPECT_EQ(raw->count_refs(), 1); + raw->internal_addref(9); + EXPECT_EQ(raw->count_refs(), 10); + EXPECT_EQ(raw->val, 20); + raw->internal_subref(6, 4); + EXPECT_EQ(raw->count_refs(), 4); + raw->internal_subref(4, 0); +} + TEST(RefCountedTest, move_ref_counted) { for (bool has_src: {true, false}) { for (bool has_dst: {true, false}) { diff --git a/vespalib/src/vespa/vespalib/util/ref_counted.cpp b/vespalib/src/vespa/vespalib/util/ref_counted.cpp index a6928082c58..43a4647e9ec 100644 --- a/vespalib/src/vespa/vespalib/util/ref_counted.cpp +++ b/vespalib/src/vespa/vespalib/util/ref_counted.cpp @@ -6,22 +6,22 @@ namespace vespalib { void -enable_ref_counted::internal_addref() const noexcept +enable_ref_counted::internal_addref(uint32_t cnt) const noexcept { // relaxed because: // the thread obtaining the new reference already has a reference - auto prev = _refs.fetch_add(1, std::memory_order_relaxed); + auto prev = _refs.fetch_add(cnt, std::memory_order_relaxed); assert(prev > 0); } void -enable_ref_counted::internal_subref() const noexcept +enable_ref_counted::internal_subref(uint32_t cnt, [[maybe_unused]] uint32_t reserve) const noexcept { // release because: // our changes to the object must be visible to the deleter - auto prev = _refs.fetch_sub(1, std::memory_order_release); - assert(prev > 0); - if (prev == 1) { + auto prev = _refs.fetch_sub(cnt, std::memory_order_release); + assert(prev >= (reserve + cnt)); + if (prev == cnt) { // acquire because: // we need to see all object changes before deleting it std::atomic_thread_fence(std::memory_order_acquire); @@ -29,7 +29,7 @@ enable_ref_counted::internal_subref() const noexcept } } -int32_t +uint32_t enable_ref_counted::count_refs() const noexcept { auto result = _refs.load(std::memory_order_relaxed); assert(result > 0); diff --git a/vespalib/src/vespa/vespalib/util/ref_counted.h b/vespalib/src/vespa/vespalib/util/ref_counted.h index 0b75cc51d7a..aff8ae7bb7e 100644 --- a/vespalib/src/vespa/vespalib/util/ref_counted.h +++ b/vespalib/src/vespa/vespalib/util/ref_counted.h @@ -27,7 +27,7 @@ class enable_ref_counted static constexpr uint32_t MAGIC = 0xcc56a933; private: uint32_t _guard; - mutable std::atomic<int32_t> _refs; + mutable std::atomic<uint32_t> _refs; protected: enable_ref_counted() noexcept : _guard(MAGIC), _refs(1) {} public: @@ -36,9 +36,11 @@ public: enable_ref_counted(const enable_ref_counted &) = delete; enable_ref_counted &operator=(enable_ref_counted &&) = delete; enable_ref_counted &operator=(const enable_ref_counted &) = delete; - void internal_addref() const noexcept; - void internal_subref() const noexcept; - int32_t count_refs() const noexcept; + void internal_addref(uint32_t cnt) const noexcept; + void internal_addref() const noexcept { internal_addref(1); } + void internal_subref(uint32_t cnt, uint32_t reserve) const noexcept; + void internal_subref() const noexcept { internal_subref(1, 0); } + uint32_t count_refs() const noexcept; }; // This is the handle to a shared object. The handle itself is not |