diff options
Diffstat (limited to 'fnet/src/tests')
-rw-r--r-- | fnet/src/tests/connect/connect_test.cpp | 71 | ||||
-rw-r--r-- | fnet/src/tests/connection_spread/connection_spread_test.cpp | 4 | ||||
-rw-r--r-- | fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp | 20 | ||||
-rw-r--r-- | fnet/src/tests/frt/method_pt/method_pt.cpp | 8 | ||||
-rw-r--r-- | fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp | 4 | ||||
-rw-r--r-- | fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp | 6 | ||||
-rw-r--r-- | fnet/src/tests/frt/rpc/detach_return_invoke.cpp | 10 | ||||
-rw-r--r-- | fnet/src/tests/frt/rpc/invoke.cpp | 6 | ||||
-rw-r--r-- | fnet/src/tests/frt/rpc/sharedblob.cpp | 10 | ||||
-rw-r--r-- | fnet/src/tests/info/info.cpp | 6 | ||||
-rw-r--r-- | fnet/src/tests/printstuff/printstuff_test.cpp | 2 | ||||
-rw-r--r-- | fnet/src/tests/transport_debugger/transport_debugger_test.cpp | 6 |
12 files changed, 83 insertions, 70 deletions
diff --git a/fnet/src/tests/connect/connect_test.cpp b/fnet/src/tests/connect/connect_test.cpp index 9d566cf37a7..d635fea6f94 100644 --- a/fnet/src/tests/connect/connect_test.cpp +++ b/fnet/src/tests/connect/connect_test.cpp @@ -86,25 +86,22 @@ struct BlockingCryptoEngine : public CryptoEngine { //----------------------------------------------------------------------------- -struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler { +struct TransportFixture : FNET_IPacketHandler { FNET_SimplePacketStreamer streamer; FNET_Transport transport; Gate conn_lost; - Gate conn_deleted; - TransportFixture() : streamer(nullptr), transport(), - conn_lost(), conn_deleted() - { + TransportFixture() : streamer(nullptr), transport(), conn_lost() { transport.Start(); } TransportFixture(AsyncResolver::HostResolver::SP host_resolver) : streamer(nullptr), transport(fnet::TransportConfig().resolver(make_resolver(std::move(host_resolver)))), - conn_lost(), conn_deleted() + conn_lost() { transport.Start(); } TransportFixture(CryptoEngine::SP crypto) : streamer(nullptr), transport(fnet::TransportConfig().crypto(std::move(crypto))), - conn_lost(), conn_deleted() + conn_lost() { transport.Start(); } @@ -114,14 +111,12 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler { packet->Free(); return FNET_FREE_CHANNEL; } - void Cleanup(FNET_Connection *) override { conn_deleted.countDown(); } FNET_Connection *connect(const vespalib::string &spec) { FNET_Connection *conn = transport.Connect(spec.c_str(), &streamer); ASSERT_TRUE(conn != nullptr); if (conn->OpenChannel(this, FNET_Context()) == nullptr) { conn_lost.countDown(); } - conn->SetCleanupHandler(this); return conn; } ~TransportFixture() override { @@ -131,8 +126,26 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler { //----------------------------------------------------------------------------- -TEST_MT_FFF("require that normal connect works", 2, - ServerSocket("tcp/0"), TransportFixture(), TimeBomb(60)) +struct ConnCheck { + uint64_t target; + ConnCheck() : target(FNET_Connection::get_num_connections()) { + EXPECT_EQUAL(target, uint64_t(0)); + } + bool at_target() const { return (FNET_Connection::get_num_connections() == target); }; + bool await(duration max_wait) const { + auto until = saturated_add(steady_clock::now(), max_wait); + while (!at_target() && steady_clock::now() < until) { + std::this_thread::sleep_for(1ms); + } + return at_target(); + } + void await() const { + ASSERT_TRUE(await(3600s)); + } +}; + +TEST_MT_FFFF("require that normal connect works", 2, + ServerSocket("tcp/0"), TransportFixture(), ConnCheck(), TimeBomb(60)) { if (thread_id == 0) { SocketHandle socket = f1.accept(); @@ -144,23 +157,23 @@ TEST_MT_FFF("require that normal connect works", 2, TEST_BARRIER(); conn->Owner()->Close(conn); f2.conn_lost.await(); - EXPECT_TRUE(!f2.conn_deleted.await(short_time)); - conn->SubRef(); - f2.conn_deleted.await(); + EXPECT_TRUE(!f3.await(short_time)); + conn->internal_subref(); + f3.await(); } } -TEST_FF("require that bogus connect fail asynchronously", TransportFixture(), TimeBomb(60)) { +TEST_FFF("require that bogus connect fail asynchronously", TransportFixture(), ConnCheck(), TimeBomb(60)) { FNET_Connection *conn = f1.connect("invalid"); f1.conn_lost.await(); - EXPECT_TRUE(!f1.conn_deleted.await(short_time)); - conn->SubRef(); - f1.conn_deleted.await(); + EXPECT_TRUE(!f2.await(short_time)); + conn->internal_subref(); + f2.await(); } -TEST_MT_FFFF("require that async close can be called before async resolve completes", 2, - ServerSocket("tcp/0"), std::shared_ptr<BlockingHostResolver>(new BlockingHostResolver()), - TransportFixture(f2), TimeBomb(60)) +TEST_MT_FFFFF("require that async close can be called before async resolve completes", 2, + ServerSocket("tcp/0"), std::shared_ptr<BlockingHostResolver>(new BlockingHostResolver()), + TransportFixture(f2), ConnCheck(), TimeBomb(60)) { if (thread_id == 0) { SocketHandle socket = f1.accept(); @@ -172,16 +185,16 @@ TEST_MT_FFFF("require that async close can be called before async resolve comple conn->Owner()->Close(conn); f3.conn_lost.await(); f2->release_caller(); - EXPECT_TRUE(!f3.conn_deleted.await(short_time)); - conn->SubRef(); - f3.conn_deleted.await(); + EXPECT_TRUE(!f4.await(short_time)); + conn->internal_subref(); + f4.await(); f1.shutdown(); } } -TEST_MT_FFFF("require that async close during async do_handshake_work works", 2, - ServerSocket("tcp/0"), std::shared_ptr<BlockingCryptoEngine>(new BlockingCryptoEngine()), - TransportFixture(f2), TimeBomb(60)) +TEST_MT_FFFFF("require that async close during async do_handshake_work works", 2, + ServerSocket("tcp/0"), std::shared_ptr<BlockingCryptoEngine>(new BlockingCryptoEngine()), + TransportFixture(f2), ConnCheck(), TimeBomb(60)) { if (thread_id == 0) { SocketHandle socket = f1.accept(); @@ -196,10 +209,10 @@ TEST_MT_FFFF("require that async close during async do_handshake_work works", 2, f3.conn_lost.await(); TEST_BARRIER(); // #1 // verify that pending work keeps relevant objects alive - EXPECT_TRUE(!f3.conn_deleted.await(short_time)); + EXPECT_TRUE(!f4.await(short_time)); EXPECT_TRUE(!f2->handshake_socket_deleted.await(short_time)); f2->handshake_work_exit.countDown(); - f3.conn_deleted.await(); + f4.await(); f2->handshake_socket_deleted.await(); } } diff --git a/fnet/src/tests/connection_spread/connection_spread_test.cpp b/fnet/src/tests/connection_spread/connection_spread_test.cpp index 6286ce65657..5908e6a4982 100644 --- a/fnet/src/tests/connection_spread/connection_spread_test.cpp +++ b/fnet/src/tests/connection_spread/connection_spread_test.cpp @@ -84,9 +84,9 @@ TEST_F("require that connections are spread among transport threads", Fixture) f1.wait_for_components(256, 257); check_threads(f1.client, 8, "client"); check_threads(f1.server, 8, "server"); - listener->SubRef(); + listener->internal_subref(); for (FNET_Connection *conn: connections) { - conn->SubRef(); + conn->internal_subref(); } } diff --git a/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp b/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp index 6325c60413a..c5ca6dc6ce9 100644 --- a/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp +++ b/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp @@ -36,7 +36,7 @@ struct RpcFixture : FRT_Invokable { } ~RpcFixture() { if (back_conn.load() != nullptr) { - back_conn.load()->SubRef(); + back_conn.load()->internal_subref(); } } uint32_t port() const { return orb.GetListenPort(); } @@ -61,7 +61,7 @@ struct RpcFixture : FRT_Invokable { ASSERT_TRUE(back_conn.load() == nullptr); back_conn.store(req->GetConnection()); ASSERT_TRUE(back_conn.load() != nullptr); - back_conn.load()->AddRef(); + back_conn.load()->internal_addref(); } FRT_Target *meta_connect(uint32_t port) { auto *target = orb.Get2WayTarget(fmt("tcp/localhost:%u", port).c_str()); @@ -69,7 +69,7 @@ struct RpcFixture : FRT_Invokable { req->SetMethodName("connect"); target->InvokeSync(req, 300.0); ASSERT_TRUE(req->CheckReturnTypes("")); - req->SubRef(); + req->internal_subref(); return target; }; static int check_result(FRT_RPCRequest *req, uint64_t expect) { @@ -81,7 +81,7 @@ struct RpcFixture : FRT_Invokable { ASSERT_EQUAL(ret, expect); ++num_ok; } - req->SubRef(); + req->internal_subref(); return num_ok; } static int verify_rpc(FNET_Connection *conn) { @@ -101,7 +101,7 @@ struct RpcFixture : FRT_Invokable { int verify_rpc(FRT_Target *target, uint32_t port) { auto *my_target = connect(port); int num_ok = verify_rpc(target) + verify_rpc(my_target) + verify_rpc(back_conn.load()); - my_target->SubRef(); + my_target->internal_subref(); return num_ok; } }; @@ -135,8 +135,8 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic EXPECT_EQUAL(RpcFixture::verify_rpc(target), 0); // outgoing 2way target should be closed EXPECT_EQUAL(RpcFixture::verify_rpc(client_target), 1); // pure client target should not be closed TEST_BARRIER(); // #5 - target->SubRef(); - client_target->SubRef(); + target->internal_subref(); + client_target->internal_subref(); } else if (thread_id == 1) { // server 2 (talks to client 2) auto self = std::make_unique<RpcFixture>(f1); f3 = self->port(); @@ -146,7 +146,7 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic TEST_BARRIER(); // #3 TEST_BARRIER(); // #4 TEST_BARRIER(); // #5 - target->SubRef(); + target->internal_subref(); } else if (thread_id == 2) { // client 1 (talks to server 1) auto self = std::make_unique<RpcFixture>(f1); f4 = self->port(); @@ -165,7 +165,7 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic TEST_BARRIER(); // #4 EXPECT_EQUAL(self->verify_rpc(target, f2), 0); TEST_BARRIER(); // #5 - target->SubRef(); + target->internal_subref(); } else { // client 2 (talks to server 2) ASSERT_EQUAL(thread_id, 3u); auto self = std::make_unique<RpcFixture>(f1); @@ -179,7 +179,7 @@ TEST_MT_FFFFF("require that supervisor can be detached from transport", 4, Basic TEST_BARRIER(); // #4 EXPECT_EQUAL(self->verify_rpc(target, f3), 3); TEST_BARRIER(); // #5 - target->SubRef(); + target->internal_subref(); } } diff --git a/fnet/src/tests/frt/method_pt/method_pt.cpp b/fnet/src/tests/frt/method_pt/method_pt.cpp index 2ac706369ae..d6c42ef7790 100644 --- a/fnet/src/tests/frt/method_pt/method_pt.cpp +++ b/fnet/src/tests/frt/method_pt/method_pt.cpp @@ -174,7 +174,7 @@ void finiTest() { delete _complexHandler; delete _mediumHandler; delete _simpleHandler; - _target->SubRef(); + _target->internal_subref(); _server.reset(); } @@ -187,7 +187,7 @@ TEST("method pt") { //-------------------------------- MEDIUM - req->SubRef(); + req->internal_subref(); req = FRT_Supervisor::AllocRPCRequest(); req->SetMethodName("mediumMethod"); _target->InvokeSync(req, 60.0); @@ -195,7 +195,7 @@ TEST("method pt") { //-------------------------------- COMPLEX - req->SubRef(); + req->internal_subref(); req = FRT_Supervisor::AllocRPCRequest(); req->SetMethodName("complexMethod"); _target->InvokeSync(req, 60.0); @@ -213,7 +213,7 @@ TEST("method pt") { fprintf(stderr, "Object inheritance NOT ok for method handlers\n"); } - req->SubRef(); + req->internal_subref(); } //------------------------------------------------------------- diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp index 41bfb7d06a6..74f69541d8b 100644 --- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp +++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp @@ -125,8 +125,8 @@ void perform_test(size_t thread_id, Client &client, Result &result, bool vital = BenchmarkTimer::benchmark(invoke, invoke, 0.5); EXPECT_GREATER_EQUAL(seq, loop_cnt); result.req_per_sec[thread_id] = double(loop_cnt) / t; - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); TEST_BARRIER(); if (thread_id == 0) { result.print(); diff --git a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp index d15fca93c0b..812f0a57c5e 100644 --- a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp +++ b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp @@ -86,7 +86,7 @@ void benchmark_rpc(Fixture &fixture, bool reconnect) { auto invoke = [&seq, &target, &req, &fixture, reconnect](){ TT_Sample sample(req_tag); if (reconnect) { - target->SubRef(); + target->internal_subref(); target = fixture.connect(); } req = fixture.orb.AllocRPCRequest(req); @@ -101,8 +101,8 @@ void benchmark_rpc(Fixture &fixture, bool reconnect) { auto before = TimeTracer::now(); double t = BenchmarkTimer::benchmark(invoke, 5.0); auto after = TimeTracer::now(); - target->SubRef(); - req->SubRef(); + target->internal_subref(); + req->internal_subref(); auto stats = TimeTracer::extract().by_time(before, after).by_tag(req_tag.id()).get(); ASSERT_TRUE(stats.size() > 0); std::sort(stats.begin(), stats.end(), DurationCmp()); diff --git a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp index 17c38ab6e3a..9a0f1778cb6 100644 --- a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp +++ b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp @@ -31,14 +31,14 @@ struct Server : public FRT_Invokable void rpc_hook(FRT_RPCRequest *req) { FNET_Connection *conn = req->GetConnection(); - conn->AddRef(); // need to keep it alive + conn->internal_addref(); // need to keep it alive req->Detach(); req->Return(); // will free request channel FRT_RPCRequest *r = orb.AllocRPCRequest(); r->SetMethodName("frt.rpc.ping"); // might re-use request channel before it is unlinked from hashmap orb.InvokeAsync(orb.GetTransport(), conn, r, 5.0, &receptor); - conn->SubRef(); // invocation will now keep the connection alive as needed + conn->internal_subref(); // invocation will now keep the connection alive as needed } }; @@ -61,11 +61,11 @@ TEST("detach return invoke") { } std::this_thread::sleep_for(10ms); } - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); if (receptor.req.load() != nullptr) { EXPECT_TRUE(!receptor.req.load()->IsError()); - receptor.req.load()->SubRef(); + receptor.req.load()->internal_subref(); } EXPECT_TRUE(receptor.req.load() != nullptr); }; diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp index e930c1252bf..f06c7428c22 100644 --- a/fnet/src/tests/frt/rpc/invoke.cpp +++ b/fnet/src/tests/frt/rpc/invoke.cpp @@ -65,7 +65,7 @@ public: } ~MyReq() { if (_req != nullptr) { - _req->SubRef(); + _req->internal_subref(); } } MyReq(const MyReq &rhs) = delete; @@ -331,7 +331,7 @@ public: } ~Fixture() { - _target->SubRef(); + _target->internal_subref(); } }; @@ -410,7 +410,7 @@ TEST_F("require that a bad target gives connection error", Fixture()) { { FRT_Target *bad_target = f1.make_bad_target(); bad_target->InvokeSync(req.borrow(), timeout); - bad_target->SubRef(); + bad_target->internal_subref(); } EXPECT_EQUAL(req.get().GetErrorCode(), FRTE_RPC_CONNECTION); } diff --git a/fnet/src/tests/frt/rpc/sharedblob.cpp b/fnet/src/tests/frt/rpc/sharedblob.cpp index 2ccb44d03cb..94f57a136a4 100644 --- a/fnet/src/tests/frt/rpc/sharedblob.cpp +++ b/fnet/src/tests/frt/rpc/sharedblob.cpp @@ -122,7 +122,7 @@ struct ServerSampler : public FRT_Invokable dataSet.sample(*req->GetReturn()); // server return before drop // keep request to sample return after drop - req->AddRef(); + req->internal_addref(); serverReq = req; } }; @@ -176,7 +176,7 @@ TEST("testExplicitShared") { req->GetParams()->AddSharedData(&blob); EXPECT_EQUAL(4, blob.refcnt); - req->SubRef(); + req->internal_subref(); EXPECT_EQUAL(1, blob.refcnt); } @@ -262,10 +262,10 @@ TEST("testImplicitShared") { } if (serverSampler.serverReq != 0) { - serverSampler.serverReq->SubRef(); + serverSampler.serverReq->internal_subref(); } - req->SubRef(); - target->SubRef(); + req->internal_subref(); + target->internal_subref(); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp index 00075cb75dd..92fe6526a10 100644 --- a/fnet/src/tests/info/info.cpp +++ b/fnet/src/tests/info/info.cpp @@ -63,9 +63,9 @@ TEST("info") { fprintf(stderr, "FD_SETSIZE: %d\n", l[2]._intval32); fprintf(stderr, "sizeof(FRT_RPCRequest): %d\n", l[3]._intval32); - target->SubRef(); - local_info->SubRef(); - remote_info->SubRef(); + target->internal_subref(); + local_info->internal_subref(); + remote_info->internal_subref(); }; TEST("size of important objects") diff --git a/fnet/src/tests/printstuff/printstuff_test.cpp b/fnet/src/tests/printstuff/printstuff_test.cpp index a9621728c5a..bd76ad29405 100644 --- a/fnet/src/tests/printstuff/printstuff_test.cpp +++ b/fnet/src/tests/printstuff/printstuff_test.cpp @@ -37,7 +37,7 @@ TEST("rpc packets in a queue") { q2.QueuePacket(&req->getStash().create<FRT_RPCRequestPacket>(req, 0, false), FNET_Context()); q2.Print(); } - req->SubRef(); + req->internal_subref(); } TEST("info") { diff --git a/fnet/src/tests/transport_debugger/transport_debugger_test.cpp b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp index a363b1df4c2..eaf2fd71bde 100644 --- a/fnet/src/tests/transport_debugger/transport_debugger_test.cpp +++ b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp @@ -113,9 +113,9 @@ TEST_FF("transport layers can be run with transport debugger", Fixture(), vespal EXPECT_EQUAL(req4->GetErrorCode(), FRTE_RPC_TIMEOUT); ASSERT_TRUE(req6->CheckReturnTypes("l")); EXPECT_EQUAL(req6->GetReturn()->GetValue(0)._intval64, 8u); - target->SubRef(); - req4->SubRef(); - req6->SubRef(); + target->internal_subref(); + req4->internal_subref(); + req6->internal_subref(); } TEST_MAIN() { TEST_RUN_ALL(); } |