summaryrefslogtreecommitdiffstats
path: root/fnet/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'fnet/src/tests')
-rw-r--r--fnet/src/tests/connect/connect_test.cpp71
-rw-r--r--fnet/src/tests/connection_spread/connection_spread_test.cpp4
-rw-r--r--fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp20
-rw-r--r--fnet/src/tests/frt/method_pt/method_pt.cpp8
-rw-r--r--fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp4
-rw-r--r--fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp6
-rw-r--r--fnet/src/tests/frt/rpc/detach_return_invoke.cpp10
-rw-r--r--fnet/src/tests/frt/rpc/invoke.cpp6
-rw-r--r--fnet/src/tests/frt/rpc/sharedblob.cpp10
-rw-r--r--fnet/src/tests/info/info.cpp6
-rw-r--r--fnet/src/tests/printstuff/printstuff_test.cpp2
-rw-r--r--fnet/src/tests/transport_debugger/transport_debugger_test.cpp6
12 files changed, 83 insertions, 70 deletions
diff --git a/fnet/src/tests/connect/connect_test.cpp b/fnet/src/tests/connect/connect_test.cpp
index 9d566cf37a7..d635fea6f94 100644
--- a/fnet/src/tests/connect/connect_test.cpp
+++ b/fnet/src/tests/connect/connect_test.cpp
@@ -86,25 +86,22 @@ struct BlockingCryptoEngine : public CryptoEngine {
//-----------------------------------------------------------------------------
-struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler {
+struct TransportFixture : FNET_IPacketHandler {
FNET_SimplePacketStreamer streamer;
FNET_Transport transport;
Gate conn_lost;
- Gate conn_deleted;
- TransportFixture() : streamer(nullptr), transport(),
- conn_lost(), conn_deleted()
- {
+ TransportFixture() : streamer(nullptr), transport(), conn_lost() {
transport.Start();
}
TransportFixture(AsyncResolver::HostResolver::SP host_resolver)
: streamer(nullptr), transport(fnet::TransportConfig().resolver(make_resolver(std::move(host_resolver)))),
- conn_lost(), conn_deleted()
+ conn_lost()
{
transport.Start();
}
TransportFixture(CryptoEngine::SP crypto)
: streamer(nullptr), transport(fnet::TransportConfig().crypto(std::move(crypto))),
- conn_lost(), conn_deleted()
+ conn_lost()
{
transport.Start();
}
@@ -114,14 +111,12 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler {
packet->Free();
return FNET_FREE_CHANNEL;
}
- void Cleanup(FNET_Connection *) override { conn_deleted.countDown(); }
FNET_Connection *connect(const vespalib::string &spec) {
FNET_Connection *conn = transport.Connect(spec.c_str(), &streamer);
ASSERT_TRUE(conn != nullptr);
if (conn->OpenChannel(this, FNET_Context()) == nullptr) {
conn_lost.countDown();
}
- conn->SetCleanupHandler(this);
return conn;
}
~TransportFixture() override {
@@ -131,8 +126,26 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler {
//-----------------------------------------------------------------------------
-TEST_MT_FFF("require that normal connect works", 2,
- ServerSocket("tcp/0"), TransportFixture(), TimeBomb(60))
+struct ConnCheck {
+ uint64_t target;
+ ConnCheck() : target(FNET_Connection::get_num_connections()) {
+ EXPECT_EQUAL(target, uint64_t(0));
+ }
+ bool at_target() const { return (FNET_Connection::get_num_connections() == target); };
+ bool await(duration max_wait) const {
+ auto until = saturated_add(steady_clock::now(), max_wait);
+ while (!at_target() && steady_clock::now() < until) {
+ std::this_thread::sleep_for(1ms);
+ }
+ return at_target();
+ }
+ void await() const {
+ ASSERT_TRUE(await(3600s));
+ }
+};
+
+TEST_MT_FFFF("require that normal connect works", 2,
+ ServerSocket("tcp/0"), TransportFixture(), ConnCheck(), TimeBomb(60))
{
if (thread_id == 0) {
SocketHandle socket = f1.accept();
@@ -144,23 +157,23 @@ TEST_MT_FFF("require that normal connect works", 2,
TEST_BARRIER();
conn->Owner()->Close(conn);
f2.conn_lost.await();
- EXPECT_TRUE(!f2.conn_deleted.await(short_time));
- conn->SubRef();
- f2.conn_deleted.await();
+ EXPECT_TRUE(!f3.await(short_time));
+ conn->internal_subref();
+ f3.await();
}
}
-TEST_FF("require that bogus connect fail asynchronously", TransportFixture(), TimeBomb(60)) {
+TEST_FFF("require that bogus connect fail asynchronously", TransportFixture(), ConnCheck(), TimeBomb(60)) {
FNET_Connection *conn = f1.connect("invalid");
f1.conn_lost.await();
- EXPECT_TRUE(!f1.conn_deleted.await(short_time));
- conn->SubRef();
- f1.conn_deleted.await();
+ EXPECT_TRUE(!f2.await(short_time));
+ conn->internal_subref();
+ f2.await();
}
-TEST_MT_FFFF("require that async close can be called before async resolve completes", 2,
- ServerSocket("tcp/0"), std::shared_ptr<BlockingHostResolver>(new BlockingHostResolver()),
- TransportFixture(f2), TimeBomb(60))
+TEST_MT_FFFFF("require that async close can be called before async resolve completes", 2,
+ ServerSocket("tcp/0"), std::shared_ptr<BlockingHostResolver>(new BlockingHostResolver()),
+ TransportFixture(f2), ConnCheck(), TimeBomb(60))
{
if (thread_id == 0) {
SocketHandle socket = f1.accept();
@@ -172,16 +185,16 @@ TEST_MT_FFFF("require that async close can be called before async resolve comple
conn->Owner()->Close(conn);
f3.conn_lost.await();
f2->release_caller();
- EXPECT_TRUE(!f3.conn_deleted.await(short_time));
- conn->SubRef();
- f3.conn_deleted.await();
+ EXPECT_TRUE(!f4.await(short_time));
+ conn->internal_subref();
+ f4.await();
f1.shutdown();
}
}
-TEST_MT_FFFF("require that async close during async do_handshake_work works", 2,
- ServerSocket("tcp/0"), std::shared_ptr<BlockingCryptoEngine>(new BlockingCryptoEngine()),
- TransportFixture(f2), TimeBomb(60))
+TEST_MT_FFFFF("require that async close during async do_handshake_work works", 2,
+ ServerSocket("tcp/0"), std::shared_ptr<BlockingCryptoEngine>(new BlockingCryptoEngine()),
+ TransportFixture(f2), ConnCheck(), TimeBomb(60))
{
if (thread_id == 0) {
SocketHandle socket = f1.accept();
@@ -196,10 +209,10 @@ TEST_MT_FFFF("require that async close during async do_handshake_work works", 2,
f3.conn_lost.await();
TEST_BARRIER(); // #1
// verify that pending work keeps relevant objects alive
- EXPECT_TRUE(!f3.conn_deleted.await(short_time));
+ EXPECT_TRUE(!f4.await(short_time));
EXPECT_TRUE(!f2->handshake_socket_deleted.await(short_time));
f2->handshake_work_exit.countDown();
- f3.conn_deleted.await();
+ f4.await();
f2->handshake_socket_deleted.await();
}
}
diff --git a/fnet/src/tests/connection_spread/connection_spread_test.cpp b/fnet/src/tests/connection_spread/connection_spread_test.cpp
index 6286ce65657..5908e6a4982 100644
--- a/fnet/src/tests/connection_spread/connection_spread_test.cpp
+++ b/fnet/src/tests/connection_spread/connection_spread_test.cpp
@@ -84,9 +84,9 @@ TEST_F("require that connections are spread among transport threads", Fixture)
f1.wait_for_components(256, 257);
check_threads(f1.client, 8, "client");
check_threads(f1.server, 8, "server");
- listener->SubRef();
+ listener->internal_subref();
for (FNET_Connection *conn: connections) {
- conn->SubRef();
+ conn->internal_subref();
}
}
diff --git a/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp b/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp
index 6325c60413a..c5ca6dc6ce9 100644
--- a/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp
+++ b/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp
@@ -36,7 +36,7 @@ struct RpcFixture : FRT_Invokable {
}
~RpcFixture() {
if (back_conn.load() != nullptr) {
- back_conn.load()->SubRef();
+ back_conn.load()->internal_subref();
}
}
uint32_t port() const { return orb.GetListenPort(); }
@@ -61,7 +61,7 @@ struct RpcFixture : FRT_Invokable {
ASSERT_TRUE(back_conn.load() == nullptr);
back_conn.store(req->GetConnection());
ASSERT_TRUE(back_conn.load() != nullptr);
- back_conn.load()->AddRef();
+ back_conn.load()->internal_addref();
}
FRT_Target *meta_connect(uint32_t port) {
auto *target = orb.Get2WayTarget(fmt("tcp/localhost:%u", port).c_str());
@@ -69,7 +69,7 @@ struct RpcFixture : FRT_Invokable {
req->SetMethodName("connect");
target->InvokeSync(req, 300.0);
ASSERT_TRUE(req->CheckReturnTypes(""));
- req->SubRef();
+ req->internal_subref();
return target;
};
static int check_result(FRT_RPCRequest *req, uint64_t expect) {
@@ -81,7 +81,7 @@ struct RpcFixture : FRT_Invokable {
ASSERT_EQUAL(ret, expect);
++num_ok;
}
- req->SubRef();
+ req->internal_subref();
return num_ok;
}
static int verify_rpc(FNET_Connection *conn) {
@@ -101,7 +101,7 @@ struct RpcFixture : FRT_Invokable {
int verify_rpc(FRT_Target *target, uint32_t port) {
auto *my_target = connect(port);
int num_ok = verify_rpc(target) + verify_rpc(my_target) + verify_rpc(back_conn.load());
- my_target->SubRef();
+ my_target->internal_subref();
return num_ok;
}
};
@@ -135,8 +135,8 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic
EXPECT_EQUAL(RpcFixture::verify_rpc(target), 0); // outgoing 2way target should be closed
EXPECT_EQUAL(RpcFixture::verify_rpc(client_target), 1); // pure client target should not be closed
TEST_BARRIER(); // #5
- target->SubRef();
- client_target->SubRef();
+ target->internal_subref();
+ client_target->internal_subref();
} else if (thread_id == 1) { // server 2 (talks to client 2)
auto self = std::make_unique<RpcFixture>(f1);
f3 = self->port();
@@ -146,7 +146,7 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic
TEST_BARRIER(); // #3
TEST_BARRIER(); // #4
TEST_BARRIER(); // #5
- target->SubRef();
+ target->internal_subref();
} else if (thread_id == 2) { // client 1 (talks to server 1)
auto self = std::make_unique<RpcFixture>(f1);
f4 = self->port();
@@ -165,7 +165,7 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic
TEST_BARRIER(); // #4
EXPECT_EQUAL(self->verify_rpc(target, f2), 0);
TEST_BARRIER(); // #5
- target->SubRef();
+ target->internal_subref();
} else { // client 2 (talks to server 2)
ASSERT_EQUAL(thread_id, 3u);
auto self = std::make_unique<RpcFixture>(f1);
@@ -179,7 +179,7 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic
TEST_BARRIER(); // #4
EXPECT_EQUAL(self->verify_rpc(target, f3), 3);
TEST_BARRIER(); // #5
- target->SubRef();
+ target->internal_subref();
}
}
diff --git a/fnet/src/tests/frt/method_pt/method_pt.cpp b/fnet/src/tests/frt/method_pt/method_pt.cpp
index 2ac706369ae..d6c42ef7790 100644
--- a/fnet/src/tests/frt/method_pt/method_pt.cpp
+++ b/fnet/src/tests/frt/method_pt/method_pt.cpp
@@ -174,7 +174,7 @@ void finiTest() {
delete _complexHandler;
delete _mediumHandler;
delete _simpleHandler;
- _target->SubRef();
+ _target->internal_subref();
_server.reset();
}
@@ -187,7 +187,7 @@ TEST("method pt") {
//-------------------------------- MEDIUM
- req->SubRef();
+ req->internal_subref();
req = FRT_Supervisor::AllocRPCRequest();
req->SetMethodName("mediumMethod");
_target->InvokeSync(req, 60.0);
@@ -195,7 +195,7 @@ TEST("method pt") {
//-------------------------------- COMPLEX
- req->SubRef();
+ req->internal_subref();
req = FRT_Supervisor::AllocRPCRequest();
req->SetMethodName("complexMethod");
_target->InvokeSync(req, 60.0);
@@ -213,7 +213,7 @@ TEST("method pt") {
fprintf(stderr, "Object inheritance NOT ok for method handlers\n");
}
- req->SubRef();
+ req->internal_subref();
}
//-------------------------------------------------------------
diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
index 41bfb7d06a6..74f69541d8b 100644
--- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
+++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
@@ -125,8 +125,8 @@ void perform_test(size_t thread_id, Client &client, Result &result, bool vital =
BenchmarkTimer::benchmark(invoke, invoke, 0.5);
EXPECT_GREATER_EQUAL(seq, loop_cnt);
result.req_per_sec[thread_id] = double(loop_cnt) / t;
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
TEST_BARRIER();
if (thread_id == 0) {
result.print();
diff --git a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp
index d15fca93c0b..812f0a57c5e 100644
--- a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp
+++ b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp
@@ -86,7 +86,7 @@ void benchmark_rpc(Fixture &fixture, bool reconnect) {
auto invoke = [&seq, &target, &req, &fixture, reconnect](){
TT_Sample sample(req_tag);
if (reconnect) {
- target->SubRef();
+ target->internal_subref();
target = fixture.connect();
}
req = fixture.orb.AllocRPCRequest(req);
@@ -101,8 +101,8 @@ void benchmark_rpc(Fixture &fixture, bool reconnect) {
auto before = TimeTracer::now();
double t = BenchmarkTimer::benchmark(invoke, 5.0);
auto after = TimeTracer::now();
- target->SubRef();
- req->SubRef();
+ target->internal_subref();
+ req->internal_subref();
auto stats = TimeTracer::extract().by_time(before, after).by_tag(req_tag.id()).get();
ASSERT_TRUE(stats.size() > 0);
std::sort(stats.begin(), stats.end(), DurationCmp());
diff --git a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
index 17c38ab6e3a..9a0f1778cb6 100644
--- a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
+++ b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
@@ -31,14 +31,14 @@ struct Server : public FRT_Invokable
void rpc_hook(FRT_RPCRequest *req) {
FNET_Connection *conn = req->GetConnection();
- conn->AddRef(); // need to keep it alive
+ conn->internal_addref(); // need to keep it alive
req->Detach();
req->Return(); // will free request channel
FRT_RPCRequest *r = orb.AllocRPCRequest();
r->SetMethodName("frt.rpc.ping");
// might re-use request channel before it is unlinked from hashmap
orb.InvokeAsync(orb.GetTransport(), conn, r, 5.0, &receptor);
- conn->SubRef(); // invocation will now keep the connection alive as needed
+ conn->internal_subref(); // invocation will now keep the connection alive as needed
}
};
@@ -61,11 +61,11 @@ TEST("detach return invoke") {
}
std::this_thread::sleep_for(10ms);
}
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
if (receptor.req.load() != nullptr) {
EXPECT_TRUE(!receptor.req.load()->IsError());
- receptor.req.load()->SubRef();
+ receptor.req.load()->internal_subref();
}
EXPECT_TRUE(receptor.req.load() != nullptr);
};
diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp
index e930c1252bf..f06c7428c22 100644
--- a/fnet/src/tests/frt/rpc/invoke.cpp
+++ b/fnet/src/tests/frt/rpc/invoke.cpp
@@ -65,7 +65,7 @@ public:
}
~MyReq() {
if (_req != nullptr) {
- _req->SubRef();
+ _req->internal_subref();
}
}
MyReq(const MyReq &rhs) = delete;
@@ -331,7 +331,7 @@ public:
}
~Fixture() {
- _target->SubRef();
+ _target->internal_subref();
}
};
@@ -410,7 +410,7 @@ TEST_F("require that a bad target gives connection error", Fixture()) {
{
FRT_Target *bad_target = f1.make_bad_target();
bad_target->InvokeSync(req.borrow(), timeout);
- bad_target->SubRef();
+ bad_target->internal_subref();
}
EXPECT_EQUAL(req.get().GetErrorCode(), FRTE_RPC_CONNECTION);
}
diff --git a/fnet/src/tests/frt/rpc/sharedblob.cpp b/fnet/src/tests/frt/rpc/sharedblob.cpp
index 2ccb44d03cb..94f57a136a4 100644
--- a/fnet/src/tests/frt/rpc/sharedblob.cpp
+++ b/fnet/src/tests/frt/rpc/sharedblob.cpp
@@ -122,7 +122,7 @@ struct ServerSampler : public FRT_Invokable
dataSet.sample(*req->GetReturn()); // server return before drop
// keep request to sample return after drop
- req->AddRef();
+ req->internal_addref();
serverReq = req;
}
};
@@ -176,7 +176,7 @@ TEST("testExplicitShared") {
req->GetParams()->AddSharedData(&blob);
EXPECT_EQUAL(4, blob.refcnt);
- req->SubRef();
+ req->internal_subref();
EXPECT_EQUAL(1, blob.refcnt);
}
@@ -262,10 +262,10 @@ TEST("testImplicitShared") {
}
if (serverSampler.serverReq != 0) {
- serverSampler.serverReq->SubRef();
+ serverSampler.serverReq->internal_subref();
}
- req->SubRef();
- target->SubRef();
+ req->internal_subref();
+ target->internal_subref();
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp
index 00075cb75dd..92fe6526a10 100644
--- a/fnet/src/tests/info/info.cpp
+++ b/fnet/src/tests/info/info.cpp
@@ -63,9 +63,9 @@ TEST("info") {
fprintf(stderr, "FD_SETSIZE: %d\n", l[2]._intval32);
fprintf(stderr, "sizeof(FRT_RPCRequest): %d\n", l[3]._intval32);
- target->SubRef();
- local_info->SubRef();
- remote_info->SubRef();
+ target->internal_subref();
+ local_info->internal_subref();
+ remote_info->internal_subref();
};
TEST("size of important objects")
diff --git a/fnet/src/tests/printstuff/printstuff_test.cpp b/fnet/src/tests/printstuff/printstuff_test.cpp
index a9621728c5a..bd76ad29405 100644
--- a/fnet/src/tests/printstuff/printstuff_test.cpp
+++ b/fnet/src/tests/printstuff/printstuff_test.cpp
@@ -37,7 +37,7 @@ TEST("rpc packets in a queue") {
q2.QueuePacket(&req->getStash().create<FRT_RPCRequestPacket>(req, 0, false), FNET_Context());
q2.Print();
}
- req->SubRef();
+ req->internal_subref();
}
TEST("info") {
diff --git a/fnet/src/tests/transport_debugger/transport_debugger_test.cpp b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp
index a363b1df4c2..eaf2fd71bde 100644
--- a/fnet/src/tests/transport_debugger/transport_debugger_test.cpp
+++ b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp
@@ -113,9 +113,9 @@ TEST_FF("transport layers can be run with transport debugger", Fixture(), vespal
EXPECT_EQUAL(req4->GetErrorCode(), FRTE_RPC_TIMEOUT);
ASSERT_TRUE(req6->CheckReturnTypes("l"));
EXPECT_EQUAL(req6->GetReturn()->GetValue(0)._intval64, 8u);
- target->SubRef();
- req4->SubRef();
- req6->SubRef();
+ target->internal_subref();
+ req4->internal_subref();
+ req6->internal_subref();
}
TEST_MAIN() { TEST_RUN_ALL(); }