aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2023-03-06 11:42:42 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2023-03-06 15:11:11 +0000
commit793a0bbc6c8bedcf0ee5de51be687d760defcd57 (patch)
tree016f83e652b0734aa3555df9332c72f9a82ff424
parent9a95875744e88f29cb4a78f8bb31bab13e7cec4d (diff)
use ref_counted in fnet
also get rid of some cleanup functions on reference counted classes enable specifying low-level parameters to addref/subref (cnt/reserve)
-rw-r--r--config/src/apps/vespa-configproxy-cmd/proxycmd.cpp4
-rw-r--r--config/src/apps/vespa-get-config/getconfig.cpp2
-rw-r--r--config/src/apps/vespa-ping-configproxy/pingproxy.cpp4
-rw-r--r--config/src/tests/frt/frt.cpp4
-rw-r--r--config/src/vespa/config/file_acquirer/file_acquirer.cpp4
-rw-r--r--config/src/vespa/config/frt/frtconfigrequest.cpp2
-rw-r--r--config/src/vespa/config/frt/frtconfigresponse.cpp4
-rw-r--r--config/src/vespa/config/frt/frtconnection.cpp8
-rw-r--r--configd/src/apps/cmd/main.cpp4
-rw-r--r--configd/src/apps/sentinel/outward-check.cpp4
-rw-r--r--configd/src/apps/sentinel/peer-check.cpp4
-rw-r--r--fnet/src/examples/frt/rpc/echo_client.cpp4
-rw-r--r--fnet/src/examples/frt/rpc/rpc_callback_client.cpp4
-rw-r--r--fnet/src/examples/frt/rpc/rpc_callback_server.cpp2
-rw-r--r--fnet/src/examples/frt/rpc/rpc_client.cpp8
-rw-r--r--fnet/src/examples/frt/rpc/rpc_info.cpp8
-rw-r--r--fnet/src/examples/frt/rpc/rpc_invoke.cpp4
-rw-r--r--fnet/src/examples/ping/pingclient.cpp2
-rw-r--r--fnet/src/examples/ping/pingserver.cpp2
-rw-r--r--fnet/src/tests/connect/connect_test.cpp71
-rw-r--r--fnet/src/tests/connection_spread/connection_spread_test.cpp4
-rw-r--r--fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp20
-rw-r--r--fnet/src/tests/frt/method_pt/method_pt.cpp8
-rw-r--r--fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp4
-rw-r--r--fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp6
-rw-r--r--fnet/src/tests/frt/rpc/detach_return_invoke.cpp10
-rw-r--r--fnet/src/tests/frt/rpc/invoke.cpp6
-rw-r--r--fnet/src/tests/frt/rpc/sharedblob.cpp10
-rw-r--r--fnet/src/tests/info/info.cpp6
-rw-r--r--fnet/src/tests/printstuff/printstuff_test.cpp2
-rw-r--r--fnet/src/tests/transport_debugger/transport_debugger_test.cpp6
-rw-r--r--fnet/src/vespa/fnet/connection.cpp64
-rw-r--r--fnet/src/vespa/fnet/connection.h42
-rw-r--r--fnet/src/vespa/fnet/frt/invoker.cpp4
-rw-r--r--fnet/src/vespa/fnet/frt/packets.cpp2
-rw-r--r--fnet/src/vespa/fnet/frt/reflection.cpp2
-rw-r--r--fnet/src/vespa/fnet/frt/rpcrequest.cpp34
-rw-r--r--fnet/src/vespa/fnet/frt/rpcrequest.h23
-rw-r--r--fnet/src/vespa/fnet/frt/supervisor.cpp6
-rw-r--r--fnet/src/vespa/fnet/frt/target.cpp1
-rw-r--r--fnet/src/vespa/fnet/frt/target.h17
-rw-r--r--fnet/src/vespa/fnet/iocomponent.cpp58
-rw-r--r--fnet/src/vespa/fnet/iocomponent.h48
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp26
-rw-r--r--fnet/src/vespa/fnet/transport_thread.h10
-rw-r--r--jrt_test/src/tests/echo/echo-client.cpp4
-rw-r--r--jrt_test/src/tests/mandatory-methods/extract-reflection.cpp10
-rw-r--r--jrt_test/src/tests/rpc-error/test-errors.cpp18
-rw-r--r--logd/src/logd/rpc_forwarder.cpp2
-rw-r--r--logd/src/logd/rpc_forwarder.h2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctarget.cpp4
-rw-r--r--searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp2
-rw-r--r--searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp16
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/client_session.cpp16
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp10
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp6
-rw-r--r--slobrok/src/apps/check_slobrok/check_slobrok.cpp2
-rw-r--r--slobrok/src/apps/sbcmd/sbcmd.cpp4
-rw-r--r--slobrok/src/tests/mirrorapi/mirrorapi.cpp4
-rw-r--r--slobrok/src/tests/rpc_mapping_monitor/rpc_mapping_monitor_test.cpp6
-rw-r--r--slobrok/src/tests/standalone/standalone.cpp4
-rw-r--r--slobrok/src/tests/startsome/rpc_info.cpp8
-rw-r--r--slobrok/src/vespa/slobrok/sbmirror.cpp8
-rw-r--r--slobrok/src/vespa/slobrok/sbregister.cpp10
-rw-r--r--slobrok/src/vespa/slobrok/server/exchange_manager.cpp2
-rw-r--r--slobrok/src/vespa/slobrok/server/managed_rpc_server.cpp8
-rw-r--r--slobrok/src/vespa/slobrok/server/remote_slobrok.cpp10
-rw-r--r--storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp2
-rw-r--r--storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp4
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp2
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp2
-rw-r--r--storage/src/vespa/storage/tools/storage-cmd.cpp6
-rw-r--r--vespaclient/src/vespa/vespaclient/vesparoute/application.cpp4
-rw-r--r--vespalib/src/tests/ref_counted/ref_counted_test.cpp13
-rw-r--r--vespalib/src/vespa/vespalib/util/ref_counted.cpp14
-rw-r--r--vespalib/src/vespa/vespalib/util/ref_counted.h10
78 files changed, 299 insertions, 490 deletions
diff --git a/config/src/apps/vespa-configproxy-cmd/proxycmd.cpp b/config/src/apps/vespa-configproxy-cmd/proxycmd.cpp
index 1d875e0ade2..2bc1c3e94d1 100644
--- a/config/src/apps/vespa-configproxy-cmd/proxycmd.cpp
+++ b/config/src/apps/vespa-configproxy-cmd/proxycmd.cpp
@@ -39,11 +39,11 @@ void ProxyCmd::invokeRPC() {
void ProxyCmd::finiRPC() {
if (_req != nullptr) {
- _req->SubRef();
+ _req->internal_subref();
_req = nullptr;
}
if (_target != nullptr) {
- _target->SubRef();
+ _target->internal_subref();
_target = NULL;
}
_server.reset();
diff --git a/config/src/apps/vespa-get-config/getconfig.cpp b/config/src/apps/vespa-get-config/getconfig.cpp
index 03a049044ae..a31caefe054 100644
--- a/config/src/apps/vespa-get-config/getconfig.cpp
+++ b/config/src/apps/vespa-get-config/getconfig.cpp
@@ -85,7 +85,7 @@ void
GetConfig::finiRPC()
{
if (_target != nullptr) {
- _target->SubRef();
+ _target->internal_subref();
_target = nullptr;
}
_server.reset();
diff --git a/config/src/apps/vespa-ping-configproxy/pingproxy.cpp b/config/src/apps/vespa-ping-configproxy/pingproxy.cpp
index e8423eba233..83a10416852 100644
--- a/config/src/apps/vespa-ping-configproxy/pingproxy.cpp
+++ b/config/src/apps/vespa-ping-configproxy/pingproxy.cpp
@@ -59,7 +59,7 @@ void
PingProxy::finiRPC()
{
if (_target != nullptr) {
- _target->SubRef();
+ _target->internal_subref();
_target = nullptr;
}
_server.reset();
@@ -146,7 +146,7 @@ PingProxy::main(int argc, char **argv)
retval = 1;
}
}
- req->SubRef();
+ req->internal_subref();
finiRPC();
return retval;
}
diff --git a/config/src/tests/frt/frt.cpp b/config/src/tests/frt/frt.cpp
index 4e77b289f49..c5098d0e7a1 100644
--- a/config/src/tests/frt/frt.cpp
+++ b/config/src/tests/frt/frt.cpp
@@ -96,7 +96,7 @@ struct Response {
~RPCFixture() {
for (size_t i = 0; i < requests.size(); i++) {
- requests[i]->SubRef();
+ requests[i]->internal_subref();
}
}
};
@@ -340,7 +340,7 @@ struct V3RequestFixture {
}
~V3RequestFixture() {
- req->SubRef();
+ req->internal_subref();
}
void encodePayload(const char * payload, uint32_t payloadSize, uint32_t uncompressedSize, const CompressionType & compressionType) {
diff --git a/config/src/vespa/config/file_acquirer/file_acquirer.cpp b/config/src/vespa/config/file_acquirer/file_acquirer.cpp
index 4ee5b4aa4a9..f38875b5727 100644
--- a/config/src/vespa/config/file_acquirer/file_acquirer.cpp
+++ b/config/src/vespa/config/file_acquirer/file_acquirer.cpp
@@ -31,8 +31,8 @@ RpcFileAcquirer::wait_for(const vespalib::string &file_ref, double timeout_s)
LOG(warning, "could not acquire file '%s' (%d: %s)",
file_ref.c_str(), req->GetErrorCode(), req->GetErrorMessage());
}
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
return path;
}
diff --git a/config/src/vespa/config/frt/frtconfigrequest.cpp b/config/src/vespa/config/frt/frtconfigrequest.cpp
index 2106da64857..e7da9f53e97 100644
--- a/config/src/vespa/config/frt/frtconfigrequest.cpp
+++ b/config/src/vespa/config/frt/frtconfigrequest.cpp
@@ -17,7 +17,7 @@ FRTConfigRequest::FRTConfigRequest(Connection * connection, const ConfigKey & ke
FRTConfigRequest::~FRTConfigRequest()
{
- _request->SubRef();
+ _request->internal_subref();
}
bool
diff --git a/config/src/vespa/config/frt/frtconfigresponse.cpp b/config/src/vespa/config/frt/frtconfigresponse.cpp
index 43ae3f9b192..28c2825f71c 100644
--- a/config/src/vespa/config/frt/frtconfigresponse.cpp
+++ b/config/src/vespa/config/frt/frtconfigresponse.cpp
@@ -10,12 +10,12 @@ FRTConfigResponse::FRTConfigResponse(FRT_RPCRequest * request)
_responseState(EMPTY),
_returnValues(_request->GetReturn())
{
- _request->AddRef();
+ _request->internal_addref();
}
FRTConfigResponse::~FRTConfigResponse()
{
- _request->SubRef();
+ _request->internal_subref();
}
bool
diff --git a/config/src/vespa/config/frt/frtconnection.cpp b/config/src/vespa/config/frt/frtconnection.cpp
index be8ccead65c..ff2a82f855b 100644
--- a/config/src/vespa/config/frt/frtconnection.cpp
+++ b/config/src/vespa/config/frt/frtconnection.cpp
@@ -30,7 +30,7 @@ FRTConnection::~FRTConnection()
{
if (_target != nullptr) {
LOG(debug, "Shutting down %s", _address.c_str());
- _target->SubRef();
+ _target->internal_subref();
_target = nullptr;
}
}
@@ -42,10 +42,10 @@ FRTConnection::getTarget()
if (_target == nullptr) {
_target = _supervisor.GetTarget(_address.c_str());
} else if ( ! _target->IsValid()) {
- _target->SubRef();
+ _target->internal_subref();
_target = _supervisor.GetTarget(_address.c_str());
}
- _target->AddRef();
+ _target->internal_addref();
return _target;
}
@@ -54,7 +54,7 @@ FRTConnection::invoke(FRT_RPCRequest * req, duration timeout, FRT_IRequestWait *
{
FRT_Target * target = getTarget();
target->InvokeAsync(req, vespalib::to_s(timeout), waiter);
- target->SubRef();
+ target->internal_subref();
}
void
diff --git a/configd/src/apps/cmd/main.cpp b/configd/src/apps/cmd/main.cpp
index 3ea26fee150..9ee7130a06e 100644
--- a/configd/src/apps/cmd/main.cpp
+++ b/configd/src/apps/cmd/main.cpp
@@ -74,7 +74,7 @@ void
Cmd::finiRPC()
{
if (_target != nullptr) {
- _target->SubRef();
+ _target->internal_subref();
_target = nullptr;
}
_server.reset();
@@ -150,7 +150,7 @@ Cmd::run(const Method &cmd, const char *arg)
}
}
}
- req->SubRef();
+ req->internal_subref();
finiRPC();
return retval;
}
diff --git a/configd/src/apps/sentinel/outward-check.cpp b/configd/src/apps/sentinel/outward-check.cpp
index b81c8e23750..b8bb2e69077 100644
--- a/configd/src/apps/sentinel/outward-check.cpp
+++ b/configd/src/apps/sentinel/outward-check.cpp
@@ -53,9 +53,9 @@ void OutwardCheck::RequestDone(FRT_RPCRequest *req) {
req->GetErrorMessage(), req->GetErrorCode());
_result = CcResult::CONN_FAIL;
}
- _req->SubRef();
+ _req->internal_subref();
_req = nullptr;
- _target->SubRef();
+ _target->internal_subref();
_target = nullptr;
_context.latch.countDown();
}
diff --git a/configd/src/apps/sentinel/peer-check.cpp b/configd/src/apps/sentinel/peer-check.cpp
index 0af5a6fb58f..ac3775d4c4d 100644
--- a/configd/src/apps/sentinel/peer-check.cpp
+++ b/configd/src/apps/sentinel/peer-check.cpp
@@ -39,9 +39,9 @@ void PeerCheck::RequestDone(FRT_RPCRequest *req) {
LOG(debug, "OK ping to %s [port %d]", _hostname.c_str(), _portnum);
_statusOk = true;
}
- _req->SubRef();
+ _req->internal_subref();
_req = nullptr;
- _target->SubRef();
+ _target->internal_subref();
_target = nullptr;
// Note: will delete this object, so must be called as final step:
_callback.returnStatus(_statusOk);
diff --git a/fnet/src/examples/frt/rpc/echo_client.cpp b/fnet/src/examples/frt/rpc/echo_client.cpp
index 0176337c466..869a010bfda 100644
--- a/fnet/src/examples/frt/rpc/echo_client.cpp
+++ b/fnet/src/examples/frt/rpc/echo_client.cpp
@@ -85,8 +85,8 @@ EchoClient::main(int argc, char **argv)
} else {
printf("Return values != parameters.\n");
}
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
return 0;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp
index b41c40ba29d..c52d48f24eb 100644
--- a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp
@@ -102,8 +102,8 @@ MyApp::main(int argc, char **argv)
ok = false;
}
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
return ok ? 0 : 1;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
index 37a819c6ee7..5c21f73da7f 100644
--- a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
@@ -27,7 +27,7 @@ void do_callback(FRT_RPCRequest *req) {
cb->GetErrorCode(),
cb->GetErrorMessage());
}
- cb->SubRef();
+ cb->internal_subref();
req->Return();
}
diff --git a/fnet/src/examples/frt/rpc/rpc_client.cpp b/fnet/src/examples/frt/rpc/rpc_client.cpp
index acb20a880c9..55168a2acba 100644
--- a/fnet/src/examples/frt/rpc/rpc_client.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_client.cpp
@@ -49,7 +49,7 @@ RPCClient::main(int argc, char **argv)
}
fprintf(stdout, "\nTesting addFloat method\n");
- req->SubRef();
+ req->internal_subref();
req = supervisor.AllocRPCRequest();
req->SetMethodName("addFloat");
req->GetParams()->AddFloat(float1);
@@ -65,7 +65,7 @@ RPCClient::main(int argc, char **argv)
}
fprintf(stdout, "\nTesting addDouble method\n");
- req->SubRef();
+ req->internal_subref();
req = supervisor.AllocRPCRequest();
req->SetMethodName("addDouble");
req->GetParams()->AddDouble(double1);
@@ -80,8 +80,8 @@ RPCClient::main(int argc, char **argv)
req->GetErrorMessage());
}
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
return 0;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_info.cpp b/fnet/src/examples/frt/rpc/rpc_info.cpp
index 9734342a24e..ab534a254c2 100644
--- a/fnet/src/examples/frt/rpc/rpc_info.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_info.cpp
@@ -15,16 +15,16 @@ public:
void GetReq(FRT_RPCRequest **req, FRT_Supervisor *supervisor)
{
if ((*req) != nullptr)
- (*req)->SubRef();
+ (*req)->internal_subref();
(*req) = supervisor->AllocRPCRequest();
}
void FreeReqs(FRT_RPCRequest *r1, FRT_RPCRequest *r2)
{
if (r1 != nullptr)
- r1->SubRef();
+ r1->internal_subref();
if (r2 != nullptr)
- r2->SubRef();
+ r2->internal_subref();
}
void DumpMethodInfo(const char *indent, FRT_RPCRequest *info,
@@ -130,7 +130,7 @@ RPCInfo::main(int argc, char **argv)
m_list->GetErrorMessage());
}
FreeReqs(m_list, info);
- target->SubRef();
+ target->internal_subref();
return 0;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_invoke.cpp b/fnet/src/examples/frt/rpc/rpc_invoke.cpp
index 9f3e90f469a..d56847098d8 100644
--- a/fnet/src/examples/frt/rpc/rpc_invoke.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_invoke.cpp
@@ -113,8 +113,8 @@ RPCClient::run(int argc, char **argv)
retCode = 3;
}
}
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
return retCode;
}
diff --git a/fnet/src/examples/ping/pingclient.cpp b/fnet/src/examples/ping/pingclient.cpp
index 43296df7e57..b59df31607a 100644
--- a/fnet/src/examples/ping/pingclient.cpp
+++ b/fnet/src/examples/ping/pingclient.cpp
@@ -86,7 +86,7 @@ PingClient::main(int argc, char **argv)
packet->Free();
}
if (conn != nullptr)
- conn->SubRef();
+ conn->internal_subref();
transport.ShutDown(true);
return 0;
}
diff --git a/fnet/src/examples/ping/pingserver.cpp b/fnet/src/examples/ping/pingserver.cpp
index fb5b12b66c0..79a67cd18a7 100644
--- a/fnet/src/examples/ping/pingserver.cpp
+++ b/fnet/src/examples/ping/pingserver.cpp
@@ -51,7 +51,7 @@ PingServer::main(int argc, char **argv)
FNET_Connector *listener =
transport.Listen(argv[1], &streamer, this);
if (listener != nullptr)
- listener->SubRef();
+ listener->internal_subref();
FNET_SignalShutDown ssd(transport);
transport.Main();
diff --git a/fnet/src/tests/connect/connect_test.cpp b/fnet/src/tests/connect/connect_test.cpp
index 9d566cf37a7..d635fea6f94 100644
--- a/fnet/src/tests/connect/connect_test.cpp
+++ b/fnet/src/tests/connect/connect_test.cpp
@@ -86,25 +86,22 @@ struct BlockingCryptoEngine : public CryptoEngine {
//-----------------------------------------------------------------------------
-struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler {
+struct TransportFixture : FNET_IPacketHandler {
FNET_SimplePacketStreamer streamer;
FNET_Transport transport;
Gate conn_lost;
- Gate conn_deleted;
- TransportFixture() : streamer(nullptr), transport(),
- conn_lost(), conn_deleted()
- {
+ TransportFixture() : streamer(nullptr), transport(), conn_lost() {
transport.Start();
}
TransportFixture(AsyncResolver::HostResolver::SP host_resolver)
: streamer(nullptr), transport(fnet::TransportConfig().resolver(make_resolver(std::move(host_resolver)))),
- conn_lost(), conn_deleted()
+ conn_lost()
{
transport.Start();
}
TransportFixture(CryptoEngine::SP crypto)
: streamer(nullptr), transport(fnet::TransportConfig().crypto(std::move(crypto))),
- conn_lost(), conn_deleted()
+ conn_lost()
{
transport.Start();
}
@@ -114,14 +111,12 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler {
packet->Free();
return FNET_FREE_CHANNEL;
}
- void Cleanup(FNET_Connection *) override { conn_deleted.countDown(); }
FNET_Connection *connect(const vespalib::string &spec) {
FNET_Connection *conn = transport.Connect(spec.c_str(), &streamer);
ASSERT_TRUE(conn != nullptr);
if (conn->OpenChannel(this, FNET_Context()) == nullptr) {
conn_lost.countDown();
}
- conn->SetCleanupHandler(this);
return conn;
}
~TransportFixture() override {
@@ -131,8 +126,26 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler {
//-----------------------------------------------------------------------------
-TEST_MT_FFF("require that normal connect works", 2,
- ServerSocket("tcp/0"), TransportFixture(), TimeBomb(60))
+struct ConnCheck {
+ uint64_t target;
+ ConnCheck() : target(FNET_Connection::get_num_connections()) {
+ EXPECT_EQUAL(target, uint64_t(0));
+ }
+ bool at_target() const { return (FNET_Connection::get_num_connections() == target); };
+ bool await(duration max_wait) const {
+ auto until = saturated_add(steady_clock::now(), max_wait);
+ while (!at_target() && steady_clock::now() < until) {
+ std::this_thread::sleep_for(1ms);
+ }
+ return at_target();
+ }
+ void await() const {
+ ASSERT_TRUE(await(3600s));
+ }
+};
+
+TEST_MT_FFFF("require that normal connect works", 2,
+ ServerSocket("tcp/0"), TransportFixture(), ConnCheck(), TimeBomb(60))
{
if (thread_id == 0) {
SocketHandle socket = f1.accept();
@@ -144,23 +157,23 @@ TEST_MT_FFF("require that normal connect works", 2,
TEST_BARRIER();
conn->Owner()->Close(conn);
f2.conn_lost.await();
- EXPECT_TRUE(!f2.conn_deleted.await(short_time));
- conn->SubRef();
- f2.conn_deleted.await();
+ EXPECT_TRUE(!f3.await(short_time));
+ conn->internal_subref();
+ f3.await();
}
}
-TEST_FF("require that bogus connect fail asynchronously", TransportFixture(), TimeBomb(60)) {
+TEST_FFF("require that bogus connect fail asynchronously", TransportFixture(), ConnCheck(), TimeBomb(60)) {
FNET_Connection *conn = f1.connect("invalid");
f1.conn_lost.await();
- EXPECT_TRUE(!f1.conn_deleted.await(short_time));
- conn->SubRef();
- f1.conn_deleted.await();
+ EXPECT_TRUE(!f2.await(short_time));
+ conn->internal_subref();
+ f2.await();
}
-TEST_MT_FFFF("require that async close can be called before async resolve completes", 2,
- ServerSocket("tcp/0"), std::shared_ptr<BlockingHostResolver>(new BlockingHostResolver()),
- TransportFixture(f2), TimeBomb(60))
+TEST_MT_FFFFF("require that async close can be called before async resolve completes", 2,
+ ServerSocket("tcp/0"), std::shared_ptr<BlockingHostResolver>(new BlockingHostResolver()),
+ TransportFixture(f2), ConnCheck(), TimeBomb(60))
{
if (thread_id == 0) {
SocketHandle socket = f1.accept();
@@ -172,16 +185,16 @@ TEST_MT_FFFF("require that async close can be called before async resolve comple
conn->Owner()->Close(conn);
f3.conn_lost.await();
f2->release_caller();
- EXPECT_TRUE(!f3.conn_deleted.await(short_time));
- conn->SubRef();
- f3.conn_deleted.await();
+ EXPECT_TRUE(!f4.await(short_time));
+ conn->internal_subref();
+ f4.await();
f1.shutdown();
}
}
-TEST_MT_FFFF("require that async close during async do_handshake_work works", 2,
- ServerSocket("tcp/0"), std::shared_ptr<BlockingCryptoEngine>(new BlockingCryptoEngine()),
- TransportFixture(f2), TimeBomb(60))
+TEST_MT_FFFFF("require that async close during async do_handshake_work works", 2,
+ ServerSocket("tcp/0"), std::shared_ptr<BlockingCryptoEngine>(new BlockingCryptoEngine()),
+ TransportFixture(f2), ConnCheck(), TimeBomb(60))
{
if (thread_id == 0) {
SocketHandle socket = f1.accept();
@@ -196,10 +209,10 @@ TEST_MT_FFFF("require that async close during async do_handshake_work works", 2,
f3.conn_lost.await();
TEST_BARRIER(); // #1
// verify that pending work keeps relevant objects alive
- EXPECT_TRUE(!f3.conn_deleted.await(short_time));
+ EXPECT_TRUE(!f4.await(short_time));
EXPECT_TRUE(!f2->handshake_socket_deleted.await(short_time));
f2->handshake_work_exit.countDown();
- f3.conn_deleted.await();
+ f4.await();
f2->handshake_socket_deleted.await();
}
}
diff --git a/fnet/src/tests/connection_spread/connection_spread_test.cpp b/fnet/src/tests/connection_spread/connection_spread_test.cpp
index 6286ce65657..5908e6a4982 100644
--- a/fnet/src/tests/connection_spread/connection_spread_test.cpp
+++ b/fnet/src/tests/connection_spread/connection_spread_test.cpp
@@ -84,9 +84,9 @@ TEST_F("require that connections are spread among transport threads", Fixture)
f1.wait_for_components(256, 257);
check_threads(f1.client, 8, "client");
check_threads(f1.server, 8, "server");
- listener->SubRef();
+ listener->internal_subref();
for (FNET_Connection *conn: connections) {
- conn->SubRef();
+ conn->internal_subref();
}
}
diff --git a/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp b/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp
index 6325c60413a..c5ca6dc6ce9 100644
--- a/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp
+++ b/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp
@@ -36,7 +36,7 @@ struct RpcFixture : FRT_Invokable {
}
~RpcFixture() {
if (back_conn.load() != nullptr) {
- back_conn.load()->SubRef();
+ back_conn.load()->internal_subref();
}
}
uint32_t port() const { return orb.GetListenPort(); }
@@ -61,7 +61,7 @@ struct RpcFixture : FRT_Invokable {
ASSERT_TRUE(back_conn.load() == nullptr);
back_conn.store(req->GetConnection());
ASSERT_TRUE(back_conn.load() != nullptr);
- back_conn.load()->AddRef();
+ back_conn.load()->internal_addref();
}
FRT_Target *meta_connect(uint32_t port) {
auto *target = orb.Get2WayTarget(fmt("tcp/localhost:%u", port).c_str());
@@ -69,7 +69,7 @@ struct RpcFixture : FRT_Invokable {
req->SetMethodName("connect");
target->InvokeSync(req, 300.0);
ASSERT_TRUE(req->CheckReturnTypes(""));
- req->SubRef();
+ req->internal_subref();
return target;
};
static int check_result(FRT_RPCRequest *req, uint64_t expect) {
@@ -81,7 +81,7 @@ struct RpcFixture : FRT_Invokable {
ASSERT_EQUAL(ret, expect);
++num_ok;
}
- req->SubRef();
+ req->internal_subref();
return num_ok;
}
static int verify_rpc(FNET_Connection *conn) {
@@ -101,7 +101,7 @@ struct RpcFixture : FRT_Invokable {
int verify_rpc(FRT_Target *target, uint32_t port) {
auto *my_target = connect(port);
int num_ok = verify_rpc(target) + verify_rpc(my_target) + verify_rpc(back_conn.load());
- my_target->SubRef();
+ my_target->internal_subref();
return num_ok;
}
};
@@ -135,8 +135,8 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic
EXPECT_EQUAL(RpcFixture::verify_rpc(target), 0); // outgoing 2way target should be closed
EXPECT_EQUAL(RpcFixture::verify_rpc(client_target), 1); // pure client target should not be closed
TEST_BARRIER(); // #5
- target->SubRef();
- client_target->SubRef();
+ target->internal_subref();
+ client_target->internal_subref();
} else if (thread_id == 1) { // server 2 (talks to client 2)
auto self = std::make_unique<RpcFixture>(f1);
f3 = self->port();
@@ -146,7 +146,7 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic
TEST_BARRIER(); // #3
TEST_BARRIER(); // #4
TEST_BARRIER(); // #5
- target->SubRef();
+ target->internal_subref();
} else if (thread_id == 2) { // client 1 (talks to server 1)
auto self = std::make_unique<RpcFixture>(f1);
f4 = self->port();
@@ -165,7 +165,7 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic
TEST_BARRIER(); // #4
EXPECT_EQUAL(self->verify_rpc(target, f2), 0);
TEST_BARRIER(); // #5
- target->SubRef();
+ target->internal_subref();
} else { // client 2 (talks to server 2)
ASSERT_EQUAL(thread_id, 3u);
auto self = std::make_unique<RpcFixture>(f1);
@@ -179,7 +179,7 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic
TEST_BARRIER(); // #4
EXPECT_EQUAL(self->verify_rpc(target, f3), 3);
TEST_BARRIER(); // #5
- target->SubRef();
+ target->internal_subref();
}
}
diff --git a/fnet/src/tests/frt/method_pt/method_pt.cpp b/fnet/src/tests/frt/method_pt/method_pt.cpp
index 2ac706369ae..d6c42ef7790 100644
--- a/fnet/src/tests/frt/method_pt/method_pt.cpp
+++ b/fnet/src/tests/frt/method_pt/method_pt.cpp
@@ -174,7 +174,7 @@ void finiTest() {
delete _complexHandler;
delete _mediumHandler;
delete _simpleHandler;
- _target->SubRef();
+ _target->internal_subref();
_server.reset();
}
@@ -187,7 +187,7 @@ TEST("method pt") {
//-------------------------------- MEDIUM
- req->SubRef();
+ req->internal_subref();
req = FRT_Supervisor::AllocRPCRequest();
req->SetMethodName("mediumMethod");
_target->InvokeSync(req, 60.0);
@@ -195,7 +195,7 @@ TEST("method pt") {
//-------------------------------- COMPLEX
- req->SubRef();
+ req->internal_subref();
req = FRT_Supervisor::AllocRPCRequest();
req->SetMethodName("complexMethod");
_target->InvokeSync(req, 60.0);
@@ -213,7 +213,7 @@ TEST("method pt") {
fprintf(stderr, "Object inheritance NOT ok for method handlers\n");
}
- req->SubRef();
+ req->internal_subref();
}
//-------------------------------------------------------------
diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
index 41bfb7d06a6..74f69541d8b 100644
--- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
+++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
@@ -125,8 +125,8 @@ void perform_test(size_t thread_id, Client &client, Result &result, bool vital =
BenchmarkTimer::benchmark(invoke, invoke, 0.5);
EXPECT_GREATER_EQUAL(seq, loop_cnt);
result.req_per_sec[thread_id] = double(loop_cnt) / t;
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
TEST_BARRIER();
if (thread_id == 0) {
result.print();
diff --git a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp
index d15fca93c0b..812f0a57c5e 100644
--- a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp
+++ b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp
@@ -86,7 +86,7 @@ void benchmark_rpc(Fixture &fixture, bool reconnect) {
auto invoke = [&seq, &target, &req, &fixture, reconnect](){
TT_Sample sample(req_tag);
if (reconnect) {
- target->SubRef();
+ target->internal_subref();
target = fixture.connect();
}
req = fixture.orb.AllocRPCRequest(req);
@@ -101,8 +101,8 @@ void benchmark_rpc(Fixture &fixture, bool reconnect) {
auto before = TimeTracer::now();
double t = BenchmarkTimer::benchmark(invoke, 5.0);
auto after = TimeTracer::now();
- target->SubRef();
- req->SubRef();
+ target->internal_subref();
+ req->internal_subref();
auto stats = TimeTracer::extract().by_time(before, after).by_tag(req_tag.id()).get();
ASSERT_TRUE(stats.size() > 0);
std::sort(stats.begin(), stats.end(), DurationCmp());
diff --git a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
index 17c38ab6e3a..9a0f1778cb6 100644
--- a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
+++ b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
@@ -31,14 +31,14 @@ struct Server : public FRT_Invokable
void rpc_hook(FRT_RPCRequest *req) {
FNET_Connection *conn = req->GetConnection();
- conn->AddRef(); // need to keep it alive
+ conn->internal_addref(); // need to keep it alive
req->Detach();
req->Return(); // will free request channel
FRT_RPCRequest *r = orb.AllocRPCRequest();
r->SetMethodName("frt.rpc.ping");
// might re-use request channel before it is unlinked from hashmap
orb.InvokeAsync(orb.GetTransport(), conn, r, 5.0, &receptor);
- conn->SubRef(); // invocation will now keep the connection alive as needed
+ conn->internal_subref(); // invocation will now keep the connection alive as needed
}
};
@@ -61,11 +61,11 @@ TEST("detach return invoke") {
}
std::this_thread::sleep_for(10ms);
}
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
if (receptor.req.load() != nullptr) {
EXPECT_TRUE(!receptor.req.load()->IsError());
- receptor.req.load()->SubRef();
+ receptor.req.load()->internal_subref();
}
EXPECT_TRUE(receptor.req.load() != nullptr);
};
diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp
index e930c1252bf..f06c7428c22 100644
--- a/fnet/src/tests/frt/rpc/invoke.cpp
+++ b/fnet/src/tests/frt/rpc/invoke.cpp
@@ -65,7 +65,7 @@ public:
}
~MyReq() {
if (_req != nullptr) {
- _req->SubRef();
+ _req->internal_subref();
}
}
MyReq(const MyReq &rhs) = delete;
@@ -331,7 +331,7 @@ public:
}
~Fixture() {
- _target->SubRef();
+ _target->internal_subref();
}
};
@@ -410,7 +410,7 @@ TEST_F("require that a bad target gives connection error", Fixture()) {
{
FRT_Target *bad_target = f1.make_bad_target();
bad_target->InvokeSync(req.borrow(), timeout);
- bad_target->SubRef();
+ bad_target->internal_subref();
}
EXPECT_EQUAL(req.get().GetErrorCode(), FRTE_RPC_CONNECTION);
}
diff --git a/fnet/src/tests/frt/rpc/sharedblob.cpp b/fnet/src/tests/frt/rpc/sharedblob.cpp
index 2ccb44d03cb..94f57a136a4 100644
--- a/fnet/src/tests/frt/rpc/sharedblob.cpp
+++ b/fnet/src/tests/frt/rpc/sharedblob.cpp
@@ -122,7 +122,7 @@ struct ServerSampler : public FRT_Invokable
dataSet.sample(*req->GetReturn()); // server return before drop
// keep request to sample return after drop
- req->AddRef();
+ req->internal_addref();
serverReq = req;
}
};
@@ -176,7 +176,7 @@ TEST("testExplicitShared") {
req->GetParams()->AddSharedData(&blob);
EXPECT_EQUAL(4, blob.refcnt);
- req->SubRef();
+ req->internal_subref();
EXPECT_EQUAL(1, blob.refcnt);
}
@@ -262,10 +262,10 @@ TEST("testImplicitShared") {
}
if (serverSampler.serverReq != 0) {
- serverSampler.serverReq->SubRef();
+ serverSampler.serverReq->internal_subref();
}
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp
index 00075cb75dd..92fe6526a10 100644
--- a/fnet/src/tests/info/info.cpp
+++ b/fnet/src/tests/info/info.cpp
@@ -63,9 +63,9 @@ TEST("info") {
fprintf(stderr, "FD_SETSIZE: %d\n", l[2]._intval32);
fprintf(stderr, "sizeof(FRT_RPCRequest): %d\n", l[3]._intval32);
- target->SubRef();
- local_info->SubRef();
- remote_info->SubRef();
+ target->internal_subref();
+ local_info->internal_subref();
+ remote_info->internal_subref();
};
TEST("size of important objects")
diff --git a/fnet/src/tests/printstuff/printstuff_test.cpp b/fnet/src/tests/printstuff/printstuff_test.cpp
index a9621728c5a..bd76ad29405 100644
--- a/fnet/src/tests/printstuff/printstuff_test.cpp
+++ b/fnet/src/tests/printstuff/printstuff_test.cpp
@@ -37,7 +37,7 @@ TEST("rpc packets in a queue") {
q2.QueuePacket(&req->getStash().create<FRT_RPCRequestPacket>(req, 0, false), FNET_Context());
q2.Print();
}
- req->SubRef();
+ req->internal_subref();
}
TEST("info") {
diff --git a/fnet/src/tests/transport_debugger/transport_debugger_test.cpp b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp
index a363b1df4c2..eaf2fd71bde 100644
--- a/fnet/src/tests/transport_debugger/transport_debugger_test.cpp
+++ b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp
@@ -113,9 +113,9 @@ TEST_FF("transport layers can be run with transport debugger", Fixture(), vespal
EXPECT_EQUAL(req4->GetErrorCode(), FRTE_RPC_TIMEOUT);
ASSERT_TRUE(req6->CheckReturnTypes("l"));
EXPECT_EQUAL(req6->GetReturn()->GetValue(0)._intval64, 8u);
- target->SubRef();
- req4->SubRef();
- req6->SubRef();
+ target->internal_subref();
+ req4->internal_subref();
+ req6->internal_subref();
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp
index e344f2a22a6..fef8a6bf01b 100644
--- a/fnet/src/vespa/fnet/connection.cpp
+++ b/fnet/src/vespa/fnet/connection.cpp
@@ -63,7 +63,7 @@ struct DoHandshakeWork : vespalib::Executor::Task {
DoHandshakeWork(FNET_Connection *conn_in, vespalib::CryptoSocket *socket_in)
: conn(conn_in), socket(socket_in)
{
- conn->AddRef();
+ conn->internal_addref();
}
void run() override {
socket->do_handshake_work();
@@ -82,7 +82,7 @@ FNET_Connection::ResolveHandler::ResolveHandler(FNET_Connection *conn)
: connection(conn),
address()
{
- connection->AddRef();
+ connection->internal_addref();
}
void
@@ -94,7 +94,7 @@ FNET_Connection::ResolveHandler::handle_result(vespalib::SocketAddress result)
FNET_Connection::ResolveHandler::~ResolveHandler()
{
- connection->SubRef();
+ connection->internal_subref();
}
@@ -149,10 +149,9 @@ FNET_Connection::SetState(State state)
}
if ( ! toDelete.empty() ) {
- for (const FNET_Channel::UP & ch : toDelete) {
- (void) ch;
- SubRef_NoLock();
- }
+ const uint32_t cnt = toDelete.size();
+ const uint32_t reserve = 1;
+ internal_subref(cnt, reserve);
}
}
@@ -185,14 +184,14 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode,
_channels.Unregister(channel);
if (hp_rc == FNET_IPacketHandler::FNET_FREE_CHANNEL) {
- SubRef_NoLock();
+ internal_subref(1, 1);
toDelete.reset(channel);
}
}
} else if (CanAcceptChannels() && IsFromPeer(chid)) { // open new channel
FNET_Channel::UP newChannel(new FNET_Channel(chid, this));
channel = newChannel.get();
- AddRef_NoLock();
+ internal_addref();
BeforeCallback(guard, channel);
if (_serverAdapter->InitChannel(channel, pcode)) {
@@ -203,7 +202,7 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode,
AfterCallback(guard);
if (hp_rc == FNET_IPacketHandler::FNET_FREE_CHANNEL) {
- SubRef_NoLock();
+ internal_subref(1, 1);
} else if (hp_rc == FNET_IPacketHandler::FNET_KEEP_CHANNEL) {
_channels.Register(newChannel.release());
} else {
@@ -212,7 +211,7 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode,
} else {
AfterCallback(guard);
- SubRef_NoLock();
+ internal_subref(1, 1);
guard.unlock();
LOG(debug, "Connection(%s): channel init failed", GetSpec());
@@ -489,8 +488,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner,
_myQueue(256),
_output(0),
_channels(),
- _callbackTarget(nullptr),
- _cleanup(nullptr)
+ _callbackTarget(nullptr)
{
assert(_socket && (_socket->get_fd() >= 0));
_num_connections.fetch_add(1, std::memory_order_relaxed);
@@ -520,8 +518,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner,
_myQueue(256),
_output(0),
_channels(),
- _callbackTarget(nullptr),
- _cleanup(nullptr)
+ _callbackTarget(nullptr)
{
_num_connections.fetch_add(1, std::memory_order_relaxed);
}
@@ -530,7 +527,6 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner,
FNET_Connection::~FNET_Connection()
{
assert(!_resolve_handler);
- assert(_cleanup == nullptr);
_num_connections.fetch_sub(1, std::memory_order_relaxed);
}
@@ -576,12 +572,6 @@ FNET_Connection::handle_handshake_act()
return ((GetState() == FNET_CONNECTING) && handshake());
}
-void
-FNET_Connection::SetCleanupHandler(FNET_IConnectionCleanupHandler *handler)
-{
- _cleanup = handler;
-}
-
FNET_Channel*
FNET_Connection::OpenChannel(FNET_IPacketHandler *handler,
@@ -598,7 +588,7 @@ FNET_Connection::OpenChannel(FNET_IPacketHandler *handler,
*chid = newChannel->GetID();
}
WaitCallback(guard, nullptr);
- AddRef_NoLock();
+ internal_addref();
ret = newChannel.release();
_channels.Register(ret);
}
@@ -614,7 +604,7 @@ FNET_Connection::OpenChannel()
{
std::lock_guard<std::mutex> guard(_ioc_lock);
chid = GetNextID();
- AddRef_NoLock();
+ internal_addref();
}
return new FNET_Channel(chid, this);
}
@@ -633,18 +623,20 @@ void
FNET_Connection::FreeChannel(FNET_Channel *channel)
{
delete channel;
- SubRef_HasLock(std::unique_lock<std::mutex>(_ioc_lock));
+ internal_subref();
}
void
FNET_Connection::CloseAndFreeChannel(FNET_Channel *channel)
{
- std::unique_lock<std::mutex> guard(_ioc_lock);
- WaitCallback(guard, channel);
- _channels.Unregister(channel);
- SubRef_HasLock(std::move(guard));
- delete channel;
+ {
+ std::unique_lock<std::mutex> guard(_ioc_lock);
+ WaitCallback(guard, channel);
+ _channels.Unregister(channel);
+ delete channel;
+ }
+ internal_subref();
}
@@ -668,7 +660,7 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid)
_writeWork++;
_queue.QueuePacket_NoLock(packet, FNET_Context(chid));
if ((writeWork == 0) && (GetState() == FNET_CONNECTED)) {
- AddRef_NoLock();
+ internal_addref();
guard.unlock();
Owner()->EnableWrite(this, /* needRef = */ false);
}
@@ -686,16 +678,6 @@ FNET_Connection::Sync()
void
-FNET_Connection::CleanupHook()
-{
- if (_cleanup != nullptr) {
- _cleanup->Cleanup(this);
- _cleanup = nullptr;
- }
-}
-
-
-void
FNET_Connection::Close()
{
_resolve_handler.reset();
diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h
index 10cf74e79de..80927fd375c 100644
--- a/fnet/src/vespa/fnet/connection.h
+++ b/fnet/src/vespa/fnet/connection.h
@@ -21,30 +21,6 @@ class FNET_IPacketHandler;
namespace vespalib::net { class ConnectionAuthContext; }
/**
- * Interface implemented by objects that want to perform connection
- * cleanup. Use the SetCleanupHandler method to register with a
- * connection. Currently, there can only be one cleanup handler per
- * connection.
- **/
-class FNET_IConnectionCleanupHandler
-{
-public:
-
- /**
- * Destructor. No cleanup needed for base class.
- */
- virtual ~FNET_IConnectionCleanupHandler(void) {}
-
- /**
- * Perform connection cleanup.
- *
- * @param conn the connection
- **/
- virtual void Cleanup(FNET_Connection *conn) = 0;
-};
-
-
-/**
* This class represents a single connection with another
* computer. The binary format on a connection is defined by the
* PacketStreamer given to the constructor. Each connection object may
@@ -115,8 +91,6 @@ private:
FNET_ChannelLookup _channels; // channel 'DB'
FNET_Channel *_callbackTarget; // target of current callback
- FNET_IConnectionCleanupHandler *_cleanup; // cleanup handler
-
std::unique_ptr<vespalib::net::ConnectionAuthContext> _auth_context;
static std::atomic<uint64_t> _num_connections; // total number of connections
@@ -377,14 +351,6 @@ public:
**/
bool handle_handshake_act() override;
- /**
- * Register a cleanup handler to be invoked when this connection is
- * about to be destructed.
- *
- * @param handler the cleanup handler
- **/
- void SetCleanupHandler(FNET_IConnectionCleanupHandler *handler);
-
/**
* Open a new channel on this connection. This method will return
@@ -468,14 +434,6 @@ public:
/**
- * Invoked by the io component superclass before the object is
- * destructed. Will invoke the Cleanup method on the cleanup handler
- * for this connection, if present.
- **/
- void CleanupHook() override;
-
-
- /**
* Close this connection immidiately. NOTE: this method should only
* be called by the transport thread.
**/
diff --git a/fnet/src/vespa/fnet/frt/invoker.cpp b/fnet/src/vespa/fnet/frt/invoker.cpp
index d4b35720a8d..421bd0c4d0b 100644
--- a/fnet/src/vespa/fnet/frt/invoker.cpp
+++ b/fnet/src/vespa/fnet/frt/invoker.cpp
@@ -98,7 +98,7 @@ FRT_RPCInvoker::HandleDone(bool freeChannel)
}
// send response to client or get rid of it
if (_noReply || (_req->GetErrorCode() == FRTE_RPC_BAD_REQUEST))
- _req->SubRef();
+ _req->internal_subref();
else
ch->Send(_req->CreateReplyPacket());
@@ -128,7 +128,7 @@ void FRT_HookInvoker::Invoke()
_req->SetDetachedPT(&detached);
(_hook->GetHandler()->*_hook->GetMethod())(_req);
assert(!detached);
- _req->SubRef();
+ _req->internal_subref();
}
void
diff --git a/fnet/src/vespa/fnet/frt/packets.cpp b/fnet/src/vespa/fnet/frt/packets.cpp
index 134a869eafb..5e600c8caa3 100644
--- a/fnet/src/vespa/fnet/frt/packets.cpp
+++ b/fnet/src/vespa/fnet/frt/packets.cpp
@@ -14,7 +14,7 @@ FRT_RPCPacket::Free()
{
if (_ownsRef) {
_req->DiscardBlobs();
- _req->SubRef();
+ _req->internal_subref();
}
}
diff --git a/fnet/src/vespa/fnet/frt/reflection.cpp b/fnet/src/vespa/fnet/frt/reflection.cpp
index af7fa069eb9..c09057ea675 100644
--- a/fnet/src/vespa/fnet/frt/reflection.cpp
+++ b/fnet/src/vespa/fnet/frt/reflection.cpp
@@ -153,7 +153,7 @@ FRT_ReflectionBuilder::FRT_ReflectionBuilder(FRT_Supervisor *supervisor)
FRT_ReflectionBuilder::~FRT_ReflectionBuilder()
{
Flush();
- _req->SubRef();
+ _req->internal_subref();
}
diff --git a/fnet/src/vespa/fnet/frt/rpcrequest.cpp b/fnet/src/vespa/fnet/frt/rpcrequest.cpp
index ac6dbb26ad6..6870a275ab1 100644
--- a/fnet/src/vespa/fnet/frt/rpcrequest.cpp
+++ b/fnet/src/vespa/fnet/frt/rpcrequest.cpp
@@ -10,7 +10,6 @@ FRT_RPCRequest::FRT_RPCRequest()
_context(),
_params(_stash),
_return(_stash),
- _refcnt(1),
_completed(0),
_errorCode(FRTE_NO_ERROR),
_errorMessageLen(0),
@@ -19,14 +18,10 @@ FRT_RPCRequest::FRT_RPCRequest()
_methodName(nullptr),
_detachedPT(nullptr),
_abortHandler(nullptr),
- _returnHandler(nullptr),
- _cleanupHandler(nullptr)
+ _returnHandler(nullptr)
{ }
-FRT_RPCRequest::~FRT_RPCRequest()
-{
- assert(_refcnt == 0);
-}
+FRT_RPCRequest::~FRT_RPCRequest() = default;
void
FRT_RPCRequest::SetError(uint32_t errorCode, const char *errorMessage, uint32_t errorMessageLen)
@@ -87,18 +82,9 @@ FRT_RPCRequest::GetConnection() {
return _returnHandler->GetConnection();
}
-void
-FRT_RPCRequest::Cleanup() {
- if (_cleanupHandler != nullptr) {
- _cleanupHandler->HandleCleanup();
- _cleanupHandler = nullptr;
- }
-}
void
FRT_RPCRequest::Reset() {
- assert(_refcnt <= 1);
- Cleanup();
_context = FNET_Context();
_params.Reset();
_return.Reset();
@@ -118,7 +104,7 @@ FRT_RPCRequest::Reset() {
bool
FRT_RPCRequest::Recycle()
{
- if (_refcnt > 1 || _errorCode != FRTE_NO_ERROR)
+ if (count_refs() > 1 || _errorCode != FRTE_NO_ERROR)
return false;
Reset();
return true;
@@ -126,18 +112,6 @@ FRT_RPCRequest::Recycle()
void
-FRT_RPCRequest::SubRef()
-{
- int oldVal = _refcnt.fetch_sub(1);
- assert(oldVal > 0);
- if (oldVal == 1) {
- Reset();
- delete this;
- }
-}
-
-
-void
FRT_RPCRequest::Print(uint32_t indent)
{
printf("%*sFRT_RPCRequest {\n", indent, "");
@@ -163,7 +137,7 @@ FRT_RPCRequest::CreateRequestPacket(bool wantReply)
flags |= FLAG_FRT_RPC_LITTLE_ENDIAN;
if (wantReply)
- AddRef();
+ internal_addref();
else
flags |= FLAG_FRT_RPC_NOREPLY;
diff --git a/fnet/src/vespa/fnet/frt/rpcrequest.h b/fnet/src/vespa/fnet/frt/rpcrequest.h
index a095c274687..72dae4a6af1 100644
--- a/fnet/src/vespa/fnet/frt/rpcrequest.h
+++ b/fnet/src/vespa/fnet/frt/rpcrequest.h
@@ -6,6 +6,7 @@
#include "error.h"
#include <vespa/fnet/context.h>
#include <vespa/vespalib/util/stash.h>
+#include <vespa/vespalib/util/ref_counted.h>
#include <atomic>
class FNET_Packet;
@@ -37,20 +38,7 @@ public:
};
-class FRT_ICleanupHandler
-{
-public:
-
- /**
- * Destructor. No cleanup needed for base class.
- */
- virtual ~FRT_ICleanupHandler(void) {}
-
- virtual void HandleCleanup() = 0;
-};
-
-
-class FRT_RPCRequest
+class FRT_RPCRequest : public vespalib::enable_ref_counted
{
private:
using Stash = vespalib::Stash;
@@ -58,7 +46,6 @@ private:
FNET_Context _context;
FRT_Values _params;
FRT_Values _return;
- std::atomic<int> _refcnt;
std::atomic<int> _completed;
uint32_t _errorCode;
uint32_t _errorMessageLen;
@@ -69,7 +56,6 @@ private:
bool *_detachedPT;
FRT_IAbortHandler *_abortHandler;
FRT_IReturnHandler *_returnHandler;
- FRT_ICleanupHandler *_cleanupHandler;
public:
FRT_RPCRequest(const FRT_RPCRequest &) = delete;
@@ -86,9 +72,6 @@ public:
_return.DiscardBlobs();
}
- void AddRef() { _refcnt.fetch_add(1); }
- void SubRef();
-
void SetContext(FNET_Context context) { _context = context; }
FNET_Context GetContext() { return _context; }
@@ -137,10 +120,8 @@ public:
void SetAbortHandler(FRT_IAbortHandler *handler) { _abortHandler = handler; }
void SetReturnHandler(FRT_IReturnHandler *handler) { _returnHandler = handler; }
- void SetCleanupHandler(FRT_ICleanupHandler *handler) { _cleanupHandler = handler; }
bool Abort();
void Return();
FNET_Connection *GetConnection();
- void Cleanup();
};
diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp
index 966c606bf97..7d6b4d727c7 100644
--- a/fnet/src/vespa/fnet/frt/supervisor.cpp
+++ b/fnet/src/vespa/fnet/frt/supervisor.cpp
@@ -31,7 +31,7 @@ FRT_Supervisor::~FRT_Supervisor()
{
_transport->detach(this);
if (_connector != nullptr) {
- _connector->SubRef();
+ _connector->internal_subref();
}
}
@@ -99,7 +99,7 @@ FRT_Supervisor::AllocRPCRequest(FRT_RPCRequest *tradein)
if (tradein->Recycle()) {
return tradein;
}
- tradein->SubRef();
+ tradein->internal_subref();
}
return new FRT_RPCRequest();
}
@@ -113,7 +113,7 @@ FRT_Supervisor::InvokeVoid(FNET_Connection *conn, FRT_RPCRequest *req)
ch->Send(req->CreateRequestPacket(false));
ch->Free();
} else {
- req->SubRef();
+ req->internal_subref();
}
}
diff --git a/fnet/src/vespa/fnet/frt/target.cpp b/fnet/src/vespa/fnet/frt/target.cpp
index 1645fba91ee..47baa95b9e2 100644
--- a/fnet/src/vespa/fnet/frt/target.cpp
+++ b/fnet/src/vespa/fnet/frt/target.cpp
@@ -6,7 +6,6 @@
FRT_Target::~FRT_Target()
{
- assert(_refcnt == 0);
FNET_Connection * conn(_conn);
_conn = nullptr;
if (conn != nullptr) {
diff --git a/fnet/src/vespa/fnet/frt/target.h b/fnet/src/vespa/fnet/frt/target.h
index ac6bf377a14..773daaca8b1 100644
--- a/fnet/src/vespa/fnet/frt/target.h
+++ b/fnet/src/vespa/fnet/frt/target.h
@@ -3,16 +3,16 @@
#pragma once
#include <vespa/fnet/connection.h>
+#include <vespa/vespalib/util/ref_counted.h>
#include <atomic>
class FNET_Scheduler;
class FRT_RPCRequest;
class FRT_IRequestWait;
-class FRT_Target
+class FRT_Target : public vespalib::enable_ref_counted
{
private:
- std::atomic<int> _refcnt;
FNET_Scheduler *_scheduler;
FNET_Connection *_conn;
@@ -21,23 +21,12 @@ private:
public:
FRT_Target(FNET_Scheduler *scheduler, FNET_Connection *conn)
- : _refcnt(1),
- _scheduler(scheduler),
+ : _scheduler(scheduler),
_conn(conn) {}
~FRT_Target();
FNET_Connection *GetConnection() const { return _conn; }
-
- void AddRef() { _refcnt.fetch_add(1); }
- void SubRef() {
- if (_refcnt.fetch_sub(1) == 1) {
- delete this;
- }
- }
-
- int GetRefCnt() const { return _refcnt; }
-
bool IsValid() {
return ((_conn != nullptr) &&
(_conn->GetState() <= FNET_Connection::FNET_CONNECTED));
diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp
index f08718c0c5c..c4fc7d859d4 100644
--- a/fnet/src/vespa/fnet/iocomponent.cpp
+++ b/fnet/src/vespa/fnet/iocomponent.cpp
@@ -16,7 +16,6 @@ FNET_IOComponent::FNET_IOComponent(FNET_TransportThread *owner,
_ioc_spec(spec),
_flags(shouldTimeOut),
_ioc_socket_fd(socket_fd),
- _ioc_refcnt(1),
_ioc_timestamp(vespalib::steady_clock::now()),
_ioc_lock(),
_ioc_cond()
@@ -39,58 +38,6 @@ FNET_IOComponent::UpdateTimeOut() {
_ioc_owner->UpdateTimeOut(this);
}
-void
-FNET_IOComponent::AddRef()
-{
- std::lock_guard<std::mutex> guard(_ioc_lock);
- assert(_ioc_refcnt > 0);
- _ioc_refcnt++;
-}
-
-
-void
-FNET_IOComponent::AddRef_NoLock()
-{
- assert(_ioc_refcnt > 0);
- _ioc_refcnt++;
-}
-
-
-void
-FNET_IOComponent::SubRef()
-{
- {
- std::lock_guard<std::mutex> guard(_ioc_lock);
- assert(_ioc_refcnt > 0);
- if (--_ioc_refcnt > 0) {
- return;
- }
- }
- CleanupHook();
- delete this;
-}
-
-
-void
-FNET_IOComponent::SubRef_HasLock(std::unique_lock<std::mutex> guard)
-{
- assert(_ioc_refcnt > 0);
- if (--_ioc_refcnt > 0) {
- return;
- }
- guard.unlock();
- CleanupHook();
- delete this;
-}
-
-
-void
-FNET_IOComponent::SubRef_NoLock()
-{
- assert(_ioc_refcnt > 1);
- _ioc_refcnt--;
-}
-
void
FNET_IOComponent::attach_selector(Selector &selector)
@@ -141,8 +88,3 @@ FNET_IOComponent::handle_handshake_act()
{
return true;
}
-
-void
-FNET_IOComponent::CleanupHook()
-{
-}
diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h
index 106e31c9236..b88b2700db5 100644
--- a/fnet/src/vespa/fnet/iocomponent.h
+++ b/fnet/src/vespa/fnet/iocomponent.h
@@ -4,6 +4,7 @@
#include "scheduler.h"
#include <vespa/vespalib/net/selector.h>
+#include <vespa/vespalib/util/ref_counted.h>
#include <mutex>
#include <condition_variable>
#include <chrono>
@@ -18,7 +19,7 @@ class FNET_Config;
* Components do IO against the network and that they use sockets to
* perform that IO.
**/
-class FNET_IOComponent
+class FNET_IOComponent : public vespalib::enable_ref_counted
{
friend class FNET_TransportThread;
@@ -46,7 +47,6 @@ protected:
std::string _ioc_spec; // connect/listen spec
Flags _flags; // Compressed representation of boolean flags;
int _ioc_socket_fd; // source of events.
- uint32_t _ioc_refcnt; // reference counter
vespalib::steady_time _ioc_timestamp; // last I/O activity
std::mutex _ioc_lock; // synchronization
std::condition_variable _ioc_cond; // synchronization
@@ -88,43 +88,6 @@ public:
std::unique_lock<std::mutex> getGuard() { return std::unique_lock<std::mutex>(_ioc_lock); }
/**
- * Allocate a reference to this component. This method locks the
- * object to protect the reference counter.
- **/
- void AddRef();
-
-
- /**
- * Allocate a reference to this component without locking the
- * object. Caller already has lock on object.
- **/
- void AddRef_NoLock();
-
-
- /**
- * Free a reference to this component. This method locks the object
- * to protect the reference counter.
- **/
- void SubRef();
-
-
- /**
- * Free a reference to this component. This method uses locking to
- * protect the reference counter, but assumes that the lock has
- * already been obtained when this method is called.
- **/
- void SubRef_HasLock(std::unique_lock<std::mutex> guard);
-
-
- /**
- * Free a reference to this component without locking the
- * object. NOTE: this method may only be called on objects with more
- * than one reference.
- **/
- void SubRef_NoLock();
-
-
- /**
* @return the owning TransportThread object.
**/
FNET_TransportThread *Owner() { return _ioc_owner; }
@@ -217,13 +180,6 @@ public:
**/
virtual bool handle_handshake_act();
- /**
- * This method is called by the SubRef methods just before the
- * object is deleted. It may be used to perform cleanup tasks that
- * must be done before the destructor is invoked.
- **/
- virtual void CleanupHook();
-
/**
* Close this component immediately. NOTE: this method should only
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index 970dc40150f..0b0df02c04c 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -107,7 +107,7 @@ FNET_TransportThread::FlushDeleteList()
FNET_IOComponent *tmp = _deleteList;
_deleteList = tmp->_ioc_next;
assert(tmp->_flags._ioc_delete);
- tmp->SubRef();
+ tmp->internal_subref();
}
}
@@ -143,12 +143,12 @@ FNET_TransportThread::DiscardEvent(FNET_ControlPacket *cpacket,
switch (cpacket->GetCommand()) {
case FNET_ControlPacket::FNET_CMD_IOC_ADD:
context._value.IOC->Close();
- context._value.IOC->SubRef();
+ context._value.IOC->internal_subref();
break;
case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_WRITE:
case FNET_ControlPacket::FNET_CMD_IOC_HANDSHAKE_ACT:
case FNET_ControlPacket::FNET_CMD_IOC_CLOSE:
- context._value.IOC->SubRef();
+ context._value.IOC->internal_subref();
break;
}
}
@@ -173,7 +173,7 @@ FNET_TransportThread::handle_close_cmd(FNET_IOComponent *ioc)
{
if (ioc->_flags._ioc_added) {
RemoveComponent(ioc);
- ioc->SubRef();
+ ioc->internal_subref();
}
ioc->Close();
AddDeleteComponent(ioc);
@@ -288,7 +288,7 @@ FNET_TransportThread::Listen(const char *spec, FNET_IPacketStreamer *streamer,
if (server_socket.valid() && server_socket.set_blocking(false)) {
FNET_Connector *connector = new FNET_Connector(this, streamer, serverAdapter, spec, std::move(server_socket));
connector->EnableReadEvent(true);
- connector->AddRef_NoLock();
+ connector->internal_addref();
Add(connector, /* needRef = */ false);
return connector;
}
@@ -314,7 +314,7 @@ void
FNET_TransportThread::Add(FNET_IOComponent *comp, bool needRef)
{
if (needRef) {
- comp->AddRef();
+ comp->internal_addref();
}
PostEvent(&FNET_ControlPacket::IOCAdd, FNET_Context(comp));
}
@@ -324,7 +324,7 @@ void
FNET_TransportThread::EnableWrite(FNET_IOComponent *comp, bool needRef)
{
if (needRef) {
- comp->AddRef();
+ comp->internal_addref();
}
PostEvent(&FNET_ControlPacket::IOCEnableWrite, FNET_Context(comp));
}
@@ -333,7 +333,7 @@ void
FNET_TransportThread::handshake_act(FNET_IOComponent *comp, bool needRef)
{
if (needRef) {
- comp->AddRef();
+ comp->internal_addref();
}
PostEvent(&FNET_ControlPacket::IOCHandshakeACT, FNET_Context(comp));
}
@@ -342,7 +342,7 @@ void
FNET_TransportThread::Close(FNET_IOComponent *comp, bool needRef)
{
if (needRef) {
- comp->AddRef();
+ comp->internal_addref();
}
PostEvent(&FNET_ControlPacket::IOCClose, FNET_Context(comp));
}
@@ -449,7 +449,7 @@ FNET_TransportThread::handle_wakeup()
}
if (context._value.IOC->_flags._ioc_delete) {
- context._value.IOC->SubRef();
+ context._value.IOC->internal_subref();
continue;
}
@@ -460,14 +460,14 @@ FNET_TransportThread::handle_wakeup()
case FNET_ControlPacket::FNET_CMD_IOC_ENABLE_WRITE:
context._value.IOC->EnableWriteEvent(true);
if (context._value.IOC->HandleWriteEvent()) {
- context._value.IOC->SubRef();
+ context._value.IOC->internal_subref();
} else {
handle_close_cmd(context._value.IOC);
}
break;
case FNET_ControlPacket::FNET_CMD_IOC_HANDSHAKE_ACT:
if (context._value.IOC->handle_handshake_act()) {
- context._value.IOC->SubRef();
+ context._value.IOC->internal_subref();
} else {
handle_close_cmd(context._value.IOC);
}
@@ -577,7 +577,7 @@ FNET_TransportThread::endEventLoop() {
component = component->_ioc_next;
RemoveComponent(tmp);
tmp->Close();
- tmp->SubRef();
+ tmp->internal_subref();
}
assert(_componentsHead == nullptr &&
_componentsTail == nullptr &&
diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h
index 1744a3d60e5..6047d4e3482 100644
--- a/fnet/src/vespa/fnet/transport_thread.h
+++ b/fnet/src/vespa/fnet/transport_thread.h
@@ -97,7 +97,7 @@ private:
/**
- * Delete (call SubRef on) all IO Components in the delete list.
+ * Delete (call internal_subref on) all IO Components in the delete list.
**/
void FlushDeleteList();
@@ -277,7 +277,7 @@ public:
* @param needRef should be set to false if the caller of this
* method already has obtained an extra reference to the
* component. If this flag is true, this method will call the
- * AddRef method on the component.
+ * internal_addref method on the component.
**/
void Add(FNET_IOComponent *comp, bool needRef = true);
@@ -294,7 +294,7 @@ public:
* @param needRef should be set to false if the caller of this
* method already has obtained an extra reference to the
* component. If this flag is true, this method will call the
- * AddRef method on the component.
+ * internal_addref method on the component.
**/
void EnableWrite(FNET_IOComponent *comp, bool needRef = true);
@@ -312,7 +312,7 @@ public:
* @param needRef should be set to false if the caller of this
* method already has obtained an extra reference to the
* component. If this flag is true, this method will call the
- * AddRef method on the component.
+ * internal_addref method on the component.
**/
void handshake_act(FNET_IOComponent *comp, bool needRef = true);
@@ -330,7 +330,7 @@ public:
* @param needRef should be set to false if the caller of this
* method already has obtained an extra reference to the
* component. If this flag is true, this method will call the
- * AddRef method on the component.
+ * internal_addref method on the component.
**/
void Close(FNET_IOComponent *comp, bool needRef = true);
diff --git a/jrt_test/src/tests/echo/echo-client.cpp b/jrt_test/src/tests/echo/echo-client.cpp
index a7c8d309114..16a4a2877b2 100644
--- a/jrt_test/src/tests/echo/echo-client.cpp
+++ b/jrt_test/src/tests/echo/echo-client.cpp
@@ -79,8 +79,8 @@ public:
} else {
printf("Return values != parameters.\n");
}
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
return 0;
}
};
diff --git a/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp b/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp
index cdcf02da465..fae7bc90abb 100644
--- a/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp
+++ b/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp
@@ -14,16 +14,16 @@ public:
void GetReq(FRT_RPCRequest **req, FRT_Supervisor *supervisor)
{
if ((*req) != nullptr)
- (*req)->SubRef();
+ (*req)->internal_subref();
(*req) = supervisor->AllocRPCRequest();
}
void FreeReqs(FRT_RPCRequest *r1, FRT_RPCRequest *r2)
{
if (r1 != nullptr)
- r1->SubRef();
+ r1->internal_subref();
if (r2 != nullptr)
- r2->SubRef();
+ r2->internal_subref();
}
void DumpMethodInfo(const char *indent, FRT_RPCRequest *info,
@@ -90,7 +90,7 @@ public:
break;
}
std::this_thread::sleep_for(1s);
- target->SubRef();
+ target->internal_subref();
target = supervisor.GetTarget(argv[1]);
}
if (info->IsError()) {
@@ -133,7 +133,7 @@ public:
m_list->GetErrorMessage());
}
FreeReqs(m_list, info);
- target->SubRef();
+ target->internal_subref();
return 0;
}
};
diff --git a/jrt_test/src/tests/rpc-error/test-errors.cpp b/jrt_test/src/tests/rpc-error/test-errors.cpp
index e64c2abfff6..1683d1e11f8 100644
--- a/jrt_test/src/tests/rpc-error/test-errors.cpp
+++ b/jrt_test/src/tests/rpc-error/test-errors.cpp
@@ -20,7 +20,7 @@ public:
}
void fini() {
- target->SubRef();
+ target->internal_subref();
target = nullptr;
client = nullptr;
}
@@ -51,7 +51,7 @@ TestErrors::testNoError()
} else {
EXPECT_TRUE(false);
}
- req1->SubRef();
+ req1->internal_subref();
}
@@ -64,7 +64,7 @@ TestErrors::testNoSuchMethod()
EXPECT_TRUE(req1->IsError());
EXPECT_TRUE(0 == req1->GetReturn()->GetNumValues());
EXPECT_TRUE(FRTE_RPC_NO_SUCH_METHOD == req1->GetErrorCode());
- req1->SubRef();
+ req1->internal_subref();
}
@@ -80,7 +80,7 @@ TestErrors::testWrongParameters()
EXPECT_TRUE(req1->IsError());
EXPECT_TRUE(0 == req1->GetReturn()->GetNumValues());
EXPECT_TRUE(FRTE_RPC_WRONG_PARAMS == req1->GetErrorCode());
- req1->SubRef();
+ req1->internal_subref();
FRT_RPCRequest *req2 = client->AllocRPCRequest();
req2->SetMethodName("test");
@@ -90,7 +90,7 @@ TestErrors::testWrongParameters()
EXPECT_TRUE(req2->IsError());
EXPECT_TRUE(0 == req2->GetReturn()->GetNumValues());
EXPECT_TRUE(FRTE_RPC_WRONG_PARAMS == req2->GetErrorCode());
- req2->SubRef();
+ req2->internal_subref();
FRT_RPCRequest *req3 = client->AllocRPCRequest();
req3->SetMethodName("test");
@@ -102,7 +102,7 @@ TestErrors::testWrongParameters()
EXPECT_TRUE(req3->IsError());
EXPECT_TRUE(0 == req3->GetReturn()->GetNumValues());
EXPECT_TRUE(FRTE_RPC_WRONG_PARAMS == req3->GetErrorCode());
- req3->SubRef();
+ req3->internal_subref();
}
@@ -118,7 +118,7 @@ TestErrors::testWrongReturnValues()
EXPECT_TRUE(req1->IsError());
EXPECT_TRUE(0 == req1->GetReturn()->GetNumValues());
EXPECT_TRUE(FRTE_RPC_WRONG_RETURN == req1->GetErrorCode());
- req1->SubRef();
+ req1->internal_subref();
}
@@ -134,7 +134,7 @@ TestErrors::testMethodFailed()
EXPECT_TRUE(req1->IsError());
EXPECT_TRUE(0 == req1->GetReturn()->GetNumValues());
EXPECT_TRUE(75000 == req1->GetErrorCode());
- req1->SubRef();
+ req1->internal_subref();
FRT_RPCRequest *req2 = client->AllocRPCRequest();
req2->SetMethodName("test");
@@ -145,7 +145,7 @@ TestErrors::testMethodFailed()
EXPECT_TRUE(req2->IsError());
EXPECT_TRUE(0 == req2->GetReturn()->GetNumValues());
EXPECT_TRUE(75000 == req2->GetErrorCode());
- req2->SubRef();
+ req2->internal_subref();
}
diff --git a/logd/src/logd/rpc_forwarder.cpp b/logd/src/logd/rpc_forwarder.cpp
index fbd39afeba4..b89eb87e2f7 100644
--- a/logd/src/logd/rpc_forwarder.cpp
+++ b/logd/src/logd/rpc_forwarder.cpp
@@ -31,7 +31,7 @@ public:
: _request(new FRT_RPCRequest())
{}
~GuardedRequest() {
- _request->SubRef();
+ _request->internal_subref();
}
FRT_RPCRequest& operator*() const { return *_request; }
FRT_RPCRequest* get() const { return _request; }
diff --git a/logd/src/logd/rpc_forwarder.h b/logd/src/logd/rpc_forwarder.h
index 75d41259eda..864c7b666cb 100644
--- a/logd/src/logd/rpc_forwarder.h
+++ b/logd/src/logd/rpc_forwarder.h
@@ -17,7 +17,7 @@ struct Metrics;
struct RpcTargetSubRef {
void operator()(FRT_Target* target) const noexcept {
- target->SubRef();
+ target->internal_subref();
}
};
using RpcTargetGuard = std::unique_ptr<FRT_Target, RpcTargetSubRef>;
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index 8c67424d5f2..e12313af53c 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -93,7 +93,7 @@ RPCSend::handleDiscard(Context ctx)
ReplyContext::UP tmp(static_cast<ReplyContext*>(ctx.value.PTR));
FRT_RPCRequest &req = tmp->getRequest();
FNET_Channel *chn = req.GetContext()._value.CHANNEL;
- req.SubRef();
+ req.internal_subref();
chn->Free();
}
@@ -189,7 +189,7 @@ RPCSend::doRequestDone(FRT_RPCRequest *req) {
reply->addError(error);
}
_net->getOwner().deliverReply(std::move(reply), ctx->getRecipient());
- req->SubRef();
+ req->internal_subref();
}
std::unique_ptr<Reply>
diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.cpp b/messagebus/src/vespa/messagebus/network/rpctarget.cpp
index 656ab081652..9c6ca9dff69 100644
--- a/messagebus/src/vespa/messagebus/network/rpctarget.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpctarget.cpp
@@ -19,7 +19,7 @@ RPCTarget::RPCTarget(const string &spec, FRT_Supervisor &orb) :
RPCTarget::~RPCTarget()
{
- _target.SubRef();
+ _target.internal_subref();
}
void
@@ -94,7 +94,7 @@ RPCTarget::RequestDone(FRT_RPCRequest *req)
_state = (_version.get() ? VERSION_RESOLVED : VERSION_NOT_RESOLVED);
}
_cond.notify_all();
- req->SubRef();
+ req->internal_subref();
}
} // namespace mbus
diff --git a/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp b/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp
index 8eb56d218fb..c88e4a283b5 100644
--- a/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp
+++ b/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp
@@ -72,11 +72,11 @@ public:
void finiRPC()
{
if (_req != nullptr) {
- _req->SubRef();
+ _req->internal_subref();
_req = nullptr;
}
if (_target != nullptr) {
- _target->SubRef();
+ _target->internal_subref();
_target = nullptr;
}
if (_frt) {
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp
index 606188f72c2..4c13ff7d762 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_cluster_controller.cpp
@@ -56,7 +56,7 @@ BmClusterController::propagate_cluster_state(uint32_t node_idx, bool distributor
auto target = target_resolver->resolve_rpc_target(storage_address, fake_bucket_id);
target->get()->InvokeSync(req, 10.0); // 10 seconds timeout
assert(!req->IsError());
- req->SubRef();
+ req->internal_subref();
}
void
diff --git a/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp b/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp
index 9c5e000c314..09fb0981a24 100644
--- a/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp
+++ b/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp
@@ -124,8 +124,8 @@ TEST_F(ProtoRpcAdapterTest, require_that_plain_rpc_ping_works) {
req->SetMethodName("frt.rpc.ping");
target->InvokeSync(req, 60.0);
EXPECT_TRUE(req->CheckReturnTypes(""));
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
}
TEST_F(ProtoRpcAdapterTest, require_that_proto_rpc_search_works) {
@@ -145,9 +145,9 @@ TEST_F(ProtoRpcAdapterTest, require_that_proto_rpc_search_works) {
EXPECT_EQ(std::string(rpc->GetErrorMessage()), std::string("Server not online"));
adapter.set_online();
}
- rpc->SubRef();
+ rpc->internal_subref();
}
- target->SubRef();
+ target->internal_subref();
SearchProtocolMetrics &metrics = adapter.metrics();
EXPECT_EQ(metrics.query().latency.getCount(), 2);
EXPECT_GT(metrics.query().latency.getTotal(), 0.0);
@@ -180,9 +180,9 @@ TEST_F(ProtoRpcAdapterTest, require_that_proto_rpc_getDocsums_works) {
EXPECT_EQ(std::string(rpc->GetErrorMessage()), std::string("Server not online"));
adapter.set_online();
}
- rpc->SubRef();
+ rpc->internal_subref();
}
- target->SubRef();
+ target->internal_subref();
SearchProtocolMetrics &metrics = adapter.metrics();
EXPECT_EQ(metrics.query().latency.getCount(), 0);
EXPECT_EQ(metrics.docsum().latency.getCount(), 2);
@@ -208,9 +208,9 @@ TEST_F(ProtoRpcAdapterTest, require_that_proto_rpc_ping_works) {
EXPECT_EQ(std::string(rpc->GetErrorMessage()), std::string("Server not online"));
adapter.set_online();
}
- rpc->SubRef();
+ rpc->internal_subref();
}
- target->SubRef();
+ target->internal_subref();
SearchProtocolMetrics &metrics = adapter.metrics();
EXPECT_EQ(metrics.query().latency.getCount(), 0);
EXPECT_EQ(metrics.docsum().latency.getCount(), 0);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp b/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp
index c44edfcec86..f5cc3c0b247 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp
@@ -54,7 +54,7 @@ Session::commit(const vespalib::ConstBufferRef & buf)
int retcode = _tlc.rpc(req);
retval = (retcode == 0);
if (retval) {
- req->SubRef();
+ req->internal_subref();
} else {
vespalib::string msg;
if (req->GetReturn() != nullptr) {
@@ -62,7 +62,7 @@ Session::commit(const vespalib::ConstBufferRef & buf)
} else {
msg = vespalib::make_string("Clientside error %s: error(%d): %s", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage());
}
- req->SubRef();
+ req->internal_subref();
throw std::runtime_error(vespalib::make_string("commit failed with code %d. server says: %s", retcode, msg.c_str()));
}
}
@@ -81,7 +81,7 @@ Session::status(SerialNum & b, SerialNum & e, size_t & count)
e = req->GetReturn()->GetValue(2)._intval64;
count = req->GetReturn()->GetValue(3)._intval64;
}
- req->SubRef();
+ req->internal_subref();
return (retval == 0);
}
@@ -93,7 +93,7 @@ Session::erase(const SerialNum & to)
req->GetParams()->AddString(_domain.c_str());
req->GetParams()->AddInt64(to);
int32_t retval(_tlc.rpc(req));
- req->SubRef();
+ req->internal_subref();
if (retval == 1) {
LOG(warning, "Prune to %" PRIu64 " denied since there were active visitors in that area", to);
}
@@ -113,7 +113,7 @@ Session::sync(const SerialNum &syncTo, SerialNum &syncedTo)
if (retval == 0) {
syncedTo = req->GetReturn()->GetValue(1)._intval64;
}
- req->SubRef();
+ req->internal_subref();
return (retval == 0);
}
@@ -138,7 +138,7 @@ bool
Session::init(FRT_RPCRequest *req)
{
int32_t retval(_tlc.rpc(req));
- req->SubRef();
+ req->internal_subref();
if (retval > 0) {
clear();
_sessionId = retval;
@@ -171,7 +171,7 @@ Session::run()
req->GetParams()->AddString(_domain.c_str());
req->GetParams()->AddInt32(_sessionId);
int32_t retval(_tlc.rpc(req));
- req->SubRef();
+ req->internal_subref();
return (retval == 0);
}
@@ -188,7 +188,7 @@ Session::close()
if ( (retval = _tlc.rpc(req)) > 0) {
std::this_thread::sleep_for(10ms);
}
- req->SubRef();
+ req->internal_subref();
} while ( retval == 1 );
}
return (retval == 0);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
index 17f06b189c6..8916a4cf0b5 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
@@ -80,7 +80,7 @@ void
TransLogClient::disconnect()
{
if (_target) {
- _target->SubRef();
+ _target->internal_subref();
}
}
@@ -91,7 +91,7 @@ TransLogClient::create(const vespalib::string & domain)
req->SetMethodName("createDomain");
req->GetParams()->AddString(domain.c_str());
int32_t retval(rpc(req));
- req->SubRef();
+ req->internal_subref();
return (retval == 0);
}
@@ -102,7 +102,7 @@ TransLogClient::remove(const vespalib::string & domain)
req->SetMethodName("deleteDomain");
req->GetParams()->AddString(domain.c_str());
int32_t retval(rpc(req));
- req->SubRef();
+ req->internal_subref();
return (retval == 0);
}
@@ -113,7 +113,7 @@ TransLogClient::open(const vespalib::string & domain)
req->SetMethodName("openDomain");
req->GetParams()->AddString(domain.c_str());
int32_t retval(rpc(req));
- req->SubRef();
+ req->internal_subref();
if (retval == 0) {
return std::make_unique<Session>(domain, *this);
}
@@ -138,7 +138,7 @@ TransLogClient::listDomains(std::vector<vespalib::string> & dir)
dir.push_back(d);
}
}
- req->SubRef();
+ req->internal_subref();
return (retval == 0);
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index c96b0cdcd61..db02f4f037e 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -361,9 +361,9 @@ public:
RPCDestination(FRT_Supervisor & supervisor, FNET_Connection * connection)
: _supervisor(supervisor), _connection(connection), _ok(true)
{
- _connection->AddRef();
+ _connection->internal_addref();
}
- ~RPCDestination() override { _connection->SubRef(); }
+ ~RPCDestination() override { _connection->internal_subref(); }
bool ok() const override {
return _ok;
@@ -395,7 +395,7 @@ private:
if ( ! ((retval == client::RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) {
LOG(error, "Return value != OK(%d) in send for method 'visitCallback'.", retval);
}
- req->SubRef();
+ req->internal_subref();
return (retval == client::RPC::OK);
}
diff --git a/slobrok/src/apps/check_slobrok/check_slobrok.cpp b/slobrok/src/apps/check_slobrok/check_slobrok.cpp
index 1b69588a9fc..4e10c9ba6fe 100644
--- a/slobrok/src/apps/check_slobrok/check_slobrok.cpp
+++ b/slobrok/src/apps/check_slobrok/check_slobrok.cpp
@@ -55,7 +55,7 @@ void
Slobrok_Checker::finiRPC()
{
if (_target != nullptr) {
- _target->SubRef();
+ _target->internal_subref();
_target = nullptr;
}
if (_server) {
diff --git a/slobrok/src/apps/sbcmd/sbcmd.cpp b/slobrok/src/apps/sbcmd/sbcmd.cpp
index 64c29cd92a9..51c8b81aa77 100644
--- a/slobrok/src/apps/sbcmd/sbcmd.cpp
+++ b/slobrok/src/apps/sbcmd/sbcmd.cpp
@@ -65,7 +65,7 @@ void
Slobrok_CMD::finiRPC()
{
if (_target != nullptr) {
- _target->SubRef();
+ _target->internal_subref();
_target = nullptr;
}
if (_server) {
@@ -181,7 +181,7 @@ Slobrok_CMD::main(int argc, char **argv)
}
}
}
- req->SubRef();
+ req->internal_subref();
finiRPC();
return 0;
}
diff --git a/slobrok/src/tests/mirrorapi/mirrorapi.cpp b/slobrok/src/tests/mirrorapi/mirrorapi.cpp
index 2e03ba52bd4..6dac6b22b05 100644
--- a/slobrok/src/tests/mirrorapi/mirrorapi.cpp
+++ b/slobrok/src/tests/mirrorapi/mirrorapi.cpp
@@ -66,8 +66,8 @@ Server::reg()
FRT_Target *sb = _server.supervisor().GetTarget(_slobrokSpec.c_str());
sb->InvokeSync(req, 5.0);
- sb->SubRef();
- req->SubRef();
+ sb->internal_subref();
+ req->internal_subref();
}
diff --git a/slobrok/src/tests/rpc_mapping_monitor/rpc_mapping_monitor_test.cpp b/slobrok/src/tests/rpc_mapping_monitor/rpc_mapping_monitor_test.cpp
index 13db95eb35c..0493a43adf3 100644
--- a/slobrok/src/tests/rpc_mapping_monitor/rpc_mapping_monitor_test.cpp
+++ b/slobrok/src/tests/rpc_mapping_monitor/rpc_mapping_monitor_test.cpp
@@ -21,11 +21,11 @@ struct Server : FRT_Invokable {
FNET_Connection *last_conn;
void set_last_conn(FNET_Connection *conn) {
if (last_conn) {
- last_conn->SubRef();
+ last_conn->internal_subref();
}
last_conn = conn;
if (last_conn) {
- last_conn->AddRef();
+ last_conn->internal_addref();
}
}
Server(fnet::TimeTools::SP time_tools)
@@ -210,7 +210,7 @@ TEST_F(RpcMappingMonitorTest, up_connection_is_reused) {
a.last_conn = nullptr;
EXPECT_TRUE(debugger.step_until([&]() { return (a.last_conn); }));
EXPECT_EQ(a.last_conn, my_conn);
- my_conn->SubRef();
+ my_conn->internal_subref();
EXPECT_EQ(hist.map[foo_a].state(), State::UP);
}
diff --git a/slobrok/src/tests/standalone/standalone.cpp b/slobrok/src/tests/standalone/standalone.cpp
index 33813fceb3a..653e4c64b0e 100644
--- a/slobrok/src/tests/standalone/standalone.cpp
+++ b/slobrok/src/tests/standalone/standalone.cpp
@@ -77,7 +77,7 @@ private:
public:
SubReferer(T* &t) : _t(t) {}
~SubReferer() {
- if (_t != nullptr) _t->SubRef();
+ if (_t != nullptr) _t->internal_subref();
}
};
@@ -134,7 +134,7 @@ TEST("standalone") {
}
fprintf(stderr, "ping failed [retry %d]\n", retry);
std::this_thread::sleep_for(200ms);
- sb->SubRef();
+ sb->internal_subref();
sb = orb.GetTarget(18541);
}
ASSERT_TRUE(checkOk(req));
diff --git a/slobrok/src/tests/startsome/rpc_info.cpp b/slobrok/src/tests/startsome/rpc_info.cpp
index a37cfa24889..be6d59f6a81 100644
--- a/slobrok/src/tests/startsome/rpc_info.cpp
+++ b/slobrok/src/tests/startsome/rpc_info.cpp
@@ -13,16 +13,16 @@ public:
void GetReq(FRT_RPCRequest **req, FRT_Supervisor *supervisor)
{
if ((*req) != nullptr)
- (*req)->SubRef();
+ (*req)->internal_subref();
(*req) = supervisor->AllocRPCRequest();
}
void FreeReqs(FRT_RPCRequest *r1, FRT_RPCRequest *r2)
{
if (r1 != nullptr)
- r1->SubRef();
+ r1->internal_subref();
if (r2 != nullptr)
- r2->SubRef();
+ r2->internal_subref();
}
void DumpMethodInfo(const char *indent, FRT_RPCRequest *info,
@@ -123,7 +123,7 @@ public:
m_list->GetErrorMessage());
}
FreeReqs(m_list, info);
- target->SubRef();
+ target->internal_subref();
return 0;
}
};
diff --git a/slobrok/src/vespa/slobrok/sbmirror.cpp b/slobrok/src/vespa/slobrok/sbmirror.cpp
index 62d288d40dc..3936b4dac4c 100644
--- a/slobrok/src/vespa/slobrok/sbmirror.cpp
+++ b/slobrok/src/vespa/slobrok/sbmirror.cpp
@@ -44,10 +44,10 @@ MirrorAPI::~MirrorAPI()
_configurator.reset(0);
if (_req != 0) {
_req->Abort();
- _req->SubRef();
+ _req->internal_subref();
}
if (_target != 0) {
- _target->SubRef();
+ _target->internal_subref();
}
}
@@ -208,7 +208,7 @@ MirrorAPI::handleReconfig()
std::string cps = _slobrokSpecs.logString();
LOG(warning, "current server %s not in list of location brokers: %s",
_currSlobrok.c_str(), cps.c_str());
- _target->SubRef();
+ _target->internal_subref();
_target = 0;
}
}
@@ -224,7 +224,7 @@ MirrorAPI::handleReqDone()
if (reconn) {
if (_target != 0) {
- _target->SubRef();
+ _target->internal_subref();
}
_target = 0;
} else {
diff --git a/slobrok/src/vespa/slobrok/sbregister.cpp b/slobrok/src/vespa/slobrok/sbregister.cpp
index 925d4ea62bc..e7db255c5d6 100644
--- a/slobrok/src/vespa/slobrok/sbregister.cpp
+++ b/slobrok/src/vespa/slobrok/sbregister.cpp
@@ -90,10 +90,10 @@ RegisterAPI::~RegisterAPI()
_configurator.reset(0);
if (_req != 0) {
_req->Abort();
- _req->SubRef();
+ _req->internal_subref();
}
if (_target != 0) {
- _target->SubRef();
+ _target->internal_subref();
}
}
@@ -139,7 +139,7 @@ RegisterAPI::handleReqDone()
// unexpected error; close our connection to this
// slobrok server and try again with a fresh slate
if (_target != 0) {
- _target->SubRef();
+ _target->internal_subref();
}
_target = 0;
_busy.store(true, std::memory_order_relaxed);
@@ -159,7 +159,7 @@ RegisterAPI::handleReqDone()
// reset backoff strategy on any successful request
_backOff.reset();
}
- _req->SubRef();
+ _req->internal_subref();
_req = 0;
}
}
@@ -173,7 +173,7 @@ RegisterAPI::handleReconnect()
vespalib::string cps = _slobrokSpecs.logString();
LOG(warning, "[RPC @ %s] location broker %s removed, will disconnect and use one of: %s",
createSpec(_orb).c_str(), _currSlobrok.c_str(), cps.c_str());
- _target->SubRef();
+ _target->internal_subref();
_target = 0;
}
}
diff --git a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp
index 4b8c1c02252..94def0271c8 100644
--- a/slobrok/src/vespa/slobrok/server/exchange_manager.cpp
+++ b/slobrok/src/vespa/slobrok/server/exchange_manager.cpp
@@ -160,7 +160,7 @@ ExchangeManager::WorkPackage::WorkItem::RequestDone(FRT_RPCRequest *req)
LOG(warning, "error doing workitem: %s", req->GetErrorMessage());
// XXX tell remslob?
}
- req->SubRef();
+ req->internal_subref();
_pendingReq = nullptr;
_pkg.doneItem(denied);
}
diff --git a/slobrok/src/vespa/slobrok/server/managed_rpc_server.cpp b/slobrok/src/vespa/slobrok/server/managed_rpc_server.cpp
index 12761c0e6e9..a7013f6e04e 100644
--- a/slobrok/src/vespa/slobrok/server/managed_rpc_server.cpp
+++ b/slobrok/src/vespa/slobrok/server/managed_rpc_server.cpp
@@ -59,7 +59,7 @@ ManagedRpcServer::cleanupMonitor()
{
_monitor.disable();
if (_monitoredServer != nullptr) {
- _monitoredServer->SubRef();
+ _monitoredServer->internal_subref();
_monitoredServer = nullptr;
}
if (_checkServerReq != nullptr) {
@@ -100,7 +100,7 @@ ManagedRpcServer::RequestDone(FRT_RPCRequest *req)
if (req->GetErrorCode() == FRTE_RPC_ABORT) {
LOG(debug, "rpcserver[%s].check aborted", getName().c_str());
- req->SubRef();
+ req->internal_subref();
_checkServerReq = nullptr;
return;
}
@@ -119,7 +119,7 @@ ManagedRpcServer::RequestDone(FRT_RPCRequest *req)
} else {
errmsg = "checkServer failed validation";
}
- req->SubRef();
+ req->internal_subref();
_checkServerReq = nullptr;
cleanupMonitor();
_mmanager.notifyFailedRpcSrv(this, errmsg);
@@ -130,7 +130,7 @@ ManagedRpcServer::RequestDone(FRT_RPCRequest *req)
LOG_ASSERT(_monitoredServer != nullptr);
_monitor.enable(_monitoredServer);
- req->SubRef();
+ req->internal_subref();
_checkServerReq = nullptr;
_mmanager.notifyOkRpcSrv(this);
}
diff --git a/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp b/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp
index aa891ae0048..0f6fffb7f4f 100644
--- a/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp
+++ b/slobrok/src/vespa/slobrok/server/remote_slobrok.cpp
@@ -32,7 +32,7 @@ void RemoteSlobrok::shutdown() {
_reconnecter.disable();
if (_remote != nullptr) {
- _remote->SubRef();
+ _remote->internal_subref();
_remote = nullptr;
}
@@ -110,7 +110,7 @@ void RemoteSlobrok::handleFetchResult() {
_serviceMapMirror.clear();
success = false;
}
- _remFetchReq->SubRef();
+ _remFetchReq->internal_subref();
_remFetchReq = nullptr;
if (success) {
maybeStartFetch();
@@ -132,12 +132,12 @@ RemoteSlobrok::RequestDone(FRT_RPCRequest *req)
const char *myspec = args[1]._string._str;
LOG(info, "addPeer(%s, %s) on remote slobrok %s at %s: %s",
myname, myspec, getName().c_str(), getSpec().c_str(), req->GetErrorMessage());
- req->SubRef();
+ req->internal_subref();
_remAddPeerReq = nullptr;
fail();
return;
}
- req->SubRef();
+ req->internal_subref();
_remAddPeerReq = nullptr;
} else {
LOG(error, "got unknown request back in RequestDone()");
@@ -166,7 +166,7 @@ RemoteSlobrok::fail()
{
// disconnect
if (_remote != nullptr) {
- _remote->SubRef();
+ _remote->internal_subref();
_remote = nullptr;
}
// schedule reconnect attempt
diff --git a/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp b/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp
index bac1ab34574..ad410eb93e8 100644
--- a/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp
+++ b/storage/src/tests/storageserver/rpc/cluster_controller_rpc_api_service_test.cpp
@@ -67,7 +67,7 @@ struct FixtureBase {
// instance _before_ we destroy the request itself.
dispatcher._enqueued.clear();
if (bound_request) {
- bound_request->SubRef();
+ bound_request->internal_subref();
}
}
};
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
index 9a98a40e7eb..bfc22b9f1ea 100644
--- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
+++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
@@ -204,8 +204,8 @@ public:
EXPECT_TRUE(req->IsError());
EXPECT_EQ(req->GetErrorCode(), FRTE_RPC_METHOD_FAILED);
EXPECT_EQ(req->GetErrorMessage(), expected_msg);
- target->SubRef();
- req->SubRef();
+ target->internal_subref();
+ req->internal_subref();
}
};
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
index 3f015d91a4a..5e4cb9d3026 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
@@ -35,7 +35,7 @@ public:
_spec(spec)
{}
~RpcTargetImpl() override {
- _target->SubRef();
+ _target->internal_subref();
}
FRT_Target* get() noexcept override { return _target; }
bool is_valid() const noexcept override { return _target->IsValid(); }
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
index bcb5dbab279..e494f4e67da 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
@@ -83,7 +83,7 @@ namespace {
struct SubRefDeleter {
template <typename T>
void operator()(T* v) const noexcept {
- v->SubRef();
+ v->internal_subref();
}
};
diff --git a/storage/src/vespa/storage/tools/storage-cmd.cpp b/storage/src/vespa/storage/tools/storage-cmd.cpp
index 12299c7458e..bc932fcf6fd 100644
--- a/storage/src/vespa/storage/tools/storage-cmd.cpp
+++ b/storage/src/vespa/storage/tools/storage-cmd.cpp
@@ -90,7 +90,7 @@ public:
req->GetErrorMessage());
continue;
}
- req->SubRef();
+ req->internal_subref();
}
FRT_RPCRequest *req = supervisor.supervisor().AllocRPCRequest();
@@ -115,8 +115,8 @@ public:
req->GetErrorMessage());
}
}
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
}
return retCode;
}
diff --git a/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp b/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp
index f70ffcc2655..50311e772e2 100644
--- a/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp
+++ b/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp
@@ -501,8 +501,8 @@ Application::isService(FRT_Supervisor &frt, const std::string &spec) const
}
}
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
return ret;
}
diff --git a/vespalib/src/tests/ref_counted/ref_counted_test.cpp b/vespalib/src/tests/ref_counted/ref_counted_test.cpp
index d831e664374..2a5321b7b27 100644
--- a/vespalib/src/tests/ref_counted/ref_counted_test.cpp
+++ b/vespalib/src/tests/ref_counted/ref_counted_test.cpp
@@ -118,6 +118,7 @@ TEST(RefCountedTest, use_internal_api) {
EXPECT_EQ(raw->count_refs(), 1);
ref_counted<Base> ref = ref_counted<Base>::internal_attach(raw);
EXPECT_EQ(ref->count_refs(), 1);
+ EXPECT_EQ(ref->val, 20);
EXPECT_EQ(ref.internal_detach(), raw);
EXPECT_EQ(raw->count_refs(), 1);
raw->internal_addref();
@@ -127,6 +128,18 @@ TEST(RefCountedTest, use_internal_api) {
raw->internal_subref();
}
+TEST(RefCountedTest, use_multi_ref_internal_api) {
+ CheckObjects check(1);
+ Base *raw = new Base(20);
+ EXPECT_EQ(raw->count_refs(), 1);
+ raw->internal_addref(9);
+ EXPECT_EQ(raw->count_refs(), 10);
+ EXPECT_EQ(raw->val, 20);
+ raw->internal_subref(6, 4);
+ EXPECT_EQ(raw->count_refs(), 4);
+ raw->internal_subref(4, 0);
+}
+
TEST(RefCountedTest, move_ref_counted) {
for (bool has_src: {true, false}) {
for (bool has_dst: {true, false}) {
diff --git a/vespalib/src/vespa/vespalib/util/ref_counted.cpp b/vespalib/src/vespa/vespalib/util/ref_counted.cpp
index a6928082c58..43a4647e9ec 100644
--- a/vespalib/src/vespa/vespalib/util/ref_counted.cpp
+++ b/vespalib/src/vespa/vespalib/util/ref_counted.cpp
@@ -6,22 +6,22 @@
namespace vespalib {
void
-enable_ref_counted::internal_addref() const noexcept
+enable_ref_counted::internal_addref(uint32_t cnt) const noexcept
{
// relaxed because:
// the thread obtaining the new reference already has a reference
- auto prev = _refs.fetch_add(1, std::memory_order_relaxed);
+ auto prev = _refs.fetch_add(cnt, std::memory_order_relaxed);
assert(prev > 0);
}
void
-enable_ref_counted::internal_subref() const noexcept
+enable_ref_counted::internal_subref(uint32_t cnt, [[maybe_unused]] uint32_t reserve) const noexcept
{
// release because:
// our changes to the object must be visible to the deleter
- auto prev = _refs.fetch_sub(1, std::memory_order_release);
- assert(prev > 0);
- if (prev == 1) {
+ auto prev = _refs.fetch_sub(cnt, std::memory_order_release);
+ assert(prev >= (reserve + cnt));
+ if (prev == cnt) {
// acquire because:
// we need to see all object changes before deleting it
std::atomic_thread_fence(std::memory_order_acquire);
@@ -29,7 +29,7 @@ enable_ref_counted::internal_subref() const noexcept
}
}
-int32_t
+uint32_t
enable_ref_counted::count_refs() const noexcept {
auto result = _refs.load(std::memory_order_relaxed);
assert(result > 0);
diff --git a/vespalib/src/vespa/vespalib/util/ref_counted.h b/vespalib/src/vespa/vespalib/util/ref_counted.h
index 0b75cc51d7a..aff8ae7bb7e 100644
--- a/vespalib/src/vespa/vespalib/util/ref_counted.h
+++ b/vespalib/src/vespa/vespalib/util/ref_counted.h
@@ -27,7 +27,7 @@ class enable_ref_counted
static constexpr uint32_t MAGIC = 0xcc56a933;
private:
uint32_t _guard;
- mutable std::atomic<int32_t> _refs;
+ mutable std::atomic<uint32_t> _refs;
protected:
enable_ref_counted() noexcept : _guard(MAGIC), _refs(1) {}
public:
@@ -36,9 +36,11 @@ public:
enable_ref_counted(const enable_ref_counted &) = delete;
enable_ref_counted &operator=(enable_ref_counted &&) = delete;
enable_ref_counted &operator=(const enable_ref_counted &) = delete;
- void internal_addref() const noexcept;
- void internal_subref() const noexcept;
- int32_t count_refs() const noexcept;
+ void internal_addref(uint32_t cnt) const noexcept;
+ void internal_addref() const noexcept { internal_addref(1); }
+ void internal_subref(uint32_t cnt, uint32_t reserve) const noexcept;
+ void internal_subref() const noexcept { internal_subref(1, 0); }
+ uint32_t count_refs() const noexcept;
};
// This is the handle to a shared object. The handle itself is not