diff options
Diffstat (limited to 'fnet/src')
-rw-r--r-- | fnet/src/examples/frt/rpc/echo_client.cpp | 5 | ||||
-rw-r--r-- | fnet/src/examples/frt/rpc/rpc_callback_client.cpp | 5 | ||||
-rw-r--r-- | fnet/src/examples/frt/rpc/rpc_callback_server.cpp | 11 | ||||
-rw-r--r-- | fnet/src/examples/frt/rpc/rpc_client.cpp | 5 | ||||
-rw-r--r-- | fnet/src/examples/frt/rpc/rpc_info.cpp | 7 | ||||
-rw-r--r-- | fnet/src/examples/frt/rpc/rpc_invoke.cpp | 5 | ||||
-rw-r--r-- | fnet/src/examples/frt/rpc/rpc_proxy.cpp | 8 | ||||
-rw-r--r-- | fnet/src/examples/frt/rpc/rpc_server.cpp | 6 | ||||
-rw-r--r-- | fnet/src/tests/frt/method_pt/method_pt.cpp | 10 | ||||
-rw-r--r-- | fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp | 2 | ||||
-rw-r--r-- | fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp | 15 | ||||
-rw-r--r-- | fnet/src/tests/frt/rpc/detach_return_invoke.cpp | 15 | ||||
-rw-r--r-- | fnet/src/tests/frt/rpc/invoke.cpp | 24 | ||||
-rw-r--r-- | fnet/src/tests/frt/rpc/session.cpp | 12 | ||||
-rw-r--r-- | fnet/src/tests/frt/rpc/sharedblob.cpp | 8 | ||||
-rw-r--r-- | fnet/src/tests/info/info.cpp | 5 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/frt/supervisor.cpp | 193 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/frt/supervisor.h | 106 |
18 files changed, 179 insertions, 263 deletions
diff --git a/fnet/src/examples/frt/rpc/echo_client.cpp b/fnet/src/examples/frt/rpc/echo_client.cpp index 9d73d38cd1e..cc406224135 100644 --- a/fnet/src/examples/frt/rpc/echo_client.cpp +++ b/fnet/src/examples/frt/rpc/echo_client.cpp @@ -19,9 +19,9 @@ EchoClient::Main() printf("usage : echo_client <connectspec>\n"); return 1; } - FRT_Supervisor supervisor; + fnet::frt::StandaloneFRT server; + FRT_Supervisor & supervisor = server.supervisor(); - supervisor.Start(); FRT_Target *target = supervisor.GetTarget(_argv[1]); FRT_RPCRequest *req = supervisor.AllocRPCRequest(); FRT_Values *args = req->GetParams(); @@ -84,7 +84,6 @@ EchoClient::Main() printf("Return values != parameters.\n"); } req->SubRef(); - supervisor.ShutDown(true); return 0; } diff --git a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp index 7c6434e870a..726a500cc55 100644 --- a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp +++ b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp @@ -47,9 +47,9 @@ MyApp::Main() } bool ok = true; RPC rpc; - FRT_Supervisor orb; + fnet::frt::StandaloneFRT server; + FRT_Supervisor & orb = server.supervisor(); rpc.Init(&orb); - orb.Start(); FRT_Target *target = orb.Get2WayTarget(_argv[1]); FRT_RPCRequest *req = orb.AllocRPCRequest(); @@ -102,7 +102,6 @@ MyApp::Main() req->SubRef(); target->SubRef(); - orb.ShutDown(true); return ok ? 0 : 1; } diff --git a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp index ac7b34ebda0..7e0caaba00d 100644 --- a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp +++ b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp @@ -60,11 +60,12 @@ MyApp::Main() return 1; } RPC rpc; - FRT_Supervisor orb; - rpc.Init(&orb); - orb.Listen(_argv[1]); - FNET_SignalShutDown ssd(*orb.GetTransport()); - orb.Main(); + fnet::frt::StandaloneFRT server; + FRT_Supervisor & supervisor = server.supervisor(); + rpc.Init(&supervisor); + supervisor.Listen(_argv[1]); + FNET_SignalShutDown ssd(*supervisor.GetTransport()); + server.wait_finished(); return 0; } diff --git a/fnet/src/examples/frt/rpc/rpc_client.cpp b/fnet/src/examples/frt/rpc/rpc_client.cpp index cc230d2bc7c..fc1d54d3440 100644 --- a/fnet/src/examples/frt/rpc/rpc_client.cpp +++ b/fnet/src/examples/frt/rpc/rpc_client.cpp @@ -19,9 +19,9 @@ RPCClient::Main() printf("usage : rpc_client <connectspec>\n"); return 1; } - FRT_Supervisor supervisor; + fnet::frt::StandaloneFRT server; + FRT_Supervisor & supervisor = server.supervisor(); - supervisor.Start(); FRT_Target *target = supervisor.GetTarget(_argv[1]); const char *str1 = "abc"; @@ -80,7 +80,6 @@ RPCClient::Main() req->SubRef(); target->SubRef(); - supervisor.ShutDown(true); return 0; } diff --git a/fnet/src/examples/frt/rpc/rpc_info.cpp b/fnet/src/examples/frt/rpc/rpc_info.cpp index 5f17e69a10d..d90d22d1986 100644 --- a/fnet/src/examples/frt/rpc/rpc_info.cpp +++ b/fnet/src/examples/frt/rpc/rpc_info.cpp @@ -79,11 +79,12 @@ RPCInfo::Main() } bool verbose = (_argc > 2 && strcmp(_argv[2], "verbose") == 0); - FRT_Supervisor supervisor; + fnet::frt::StandaloneFRT server; + FRT_Supervisor & supervisor = server.supervisor(); + FRT_Target *target = supervisor.GetTarget(_argv[1]); FRT_RPCRequest *m_list = nullptr; FRT_RPCRequest *info = nullptr; - supervisor.Start(); GetReq(&info, &supervisor); info->SetMethodName("frt.rpc.ping"); @@ -91,7 +92,6 @@ RPCInfo::Main() if (info->IsError()) { fprintf(stderr, "Error talking to %s\n", _argv[1]); FreeReqs(m_list, info); - supervisor.ShutDown(true); return 1; } @@ -129,7 +129,6 @@ RPCInfo::Main() } FreeReqs(m_list, info); target->SubRef(); - supervisor.ShutDown(true); return 0; } diff --git a/fnet/src/examples/frt/rpc/rpc_invoke.cpp b/fnet/src/examples/frt/rpc/rpc_invoke.cpp index ea34ffa6905..fb82622a537 100644 --- a/fnet/src/examples/frt/rpc/rpc_invoke.cpp +++ b/fnet/src/examples/frt/rpc/rpc_invoke.cpp @@ -71,8 +71,8 @@ int RPCClient::run() { int retCode = 0; - FRT_Supervisor supervisor; - supervisor.Start(); + fnet::frt::StandaloneFRT server; + FRT_Supervisor & supervisor = server.supervisor(); int targetArg = 1; int methNameArg = 2; int startOfArgs = 3; @@ -109,7 +109,6 @@ RPCClient::run() } req->SubRef(); target->SubRef(); - supervisor.ShutDown(true); return retCode; } diff --git a/fnet/src/examples/frt/rpc/rpc_proxy.cpp b/fnet/src/examples/frt/rpc/rpc_proxy.cpp index 496190b2b80..fa152dcec10 100644 --- a/fnet/src/examples/frt/rpc/rpc_proxy.cpp +++ b/fnet/src/examples/frt/rpc/rpc_proxy.cpp @@ -224,18 +224,18 @@ App::Main() } bool verbose = (_argc > 3) && (strcmp(_argv[3], "verbose") == 0); - FRT_Supervisor supervisor; + fnet::frt::StandaloneFRT server; + FRT_Supervisor & supervisor = server.supervisor(); RPCProxy proxy(supervisor, _argv[2], verbose); supervisor.GetReflectionManager()->Reset(); supervisor.SetSessionInitHook(FRT_METHOD(RPCProxy::HOOK_Init), &proxy); supervisor.SetSessionDownHook(FRT_METHOD(RPCProxy::HOOK_Down), &proxy); supervisor.SetSessionFiniHook(FRT_METHOD(RPCProxy::HOOK_Fini), &proxy); - supervisor.SetMethodMismatchHook(FRT_METHOD(RPCProxy::HOOK_Mismatch), - &proxy); + supervisor.SetMethodMismatchHook(FRT_METHOD(RPCProxy::HOOK_Mismatch), &proxy); supervisor.Listen(_argv[1]); FNET_SignalShutDown ssd(*supervisor.GetTransport()); - supervisor.Main(); + server.wait_finished(); return 0; } diff --git a/fnet/src/examples/frt/rpc/rpc_server.cpp b/fnet/src/examples/frt/rpc/rpc_server.cpp index 03d618133c9..cc3972166ad 100644 --- a/fnet/src/examples/frt/rpc/rpc_server.cpp +++ b/fnet/src/examples/frt/rpc/rpc_server.cpp @@ -91,12 +91,12 @@ RPCServer::Main(int argc, char **argv) return 1; } - _supervisor = new FRT_Supervisor(); + fnet::frt::StandaloneFRT server; + _supervisor = &server.supervisor(); InitRPC(_supervisor); _supervisor->Listen(argv[1]); FNET_SignalShutDown ssd(*_supervisor->GetTransport()); - _supervisor->Main(); - delete _supervisor; + server.wait_finished(); return 0; } diff --git a/fnet/src/tests/frt/method_pt/method_pt.cpp b/fnet/src/tests/frt/method_pt/method_pt.cpp index 5417fddceeb..608a435bd1d 100644 --- a/fnet/src/tests/frt/method_pt/method_pt.cpp +++ b/fnet/src/tests/frt/method_pt/method_pt.cpp @@ -18,6 +18,7 @@ class ComplexHandler3; Test *_test; +std::unique_ptr<fnet::frt::StandaloneFRT> _server; FRT_Supervisor *_supervisor; FRT_Target *_target; SimpleHandler *_simpleHandler; @@ -176,7 +177,8 @@ public: //------------------------------------------------------------- void initTest() { - _supervisor = new FRT_Supervisor(); + _server = std::make_unique<fnet::frt::StandaloneFRT>(); + _supervisor = &_server->supervisor(); _simpleHandler = new SimpleHandler(); _mediumHandler1 = new MediumHandler1(); _mediumHandler2 = new MediumHandler2(); @@ -200,9 +202,6 @@ void initTest() { _target = _supervisor->GetTarget(spec.c_str()); ASSERT_TRUE(_target != nullptr); - bool startOK = _supervisor->Start(); - ASSERT_TRUE(startOK); - FRT_ReflectionBuilder rb(_supervisor); //------------------------------------------------------------------- @@ -247,7 +246,6 @@ void initTest() { void finiTest() { - _supervisor->ShutDown(true); delete _complexHandler1; delete _complexHandler2; delete _complexHandler3; @@ -256,7 +254,7 @@ void finiTest() { delete _mediumHandler3; delete _simpleHandler; _target->SubRef(); - delete _supervisor; + _server.reset(); } diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp index 2441ea6eaa0..ed4911175a0 100644 --- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp +++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp @@ -15,7 +15,7 @@ struct Rpc : FRT_Invokable { FNET_Transport transport; FRT_Supervisor orb; Rpc(CryptoEngine::SP crypto, size_t num_threads) - : thread_pool(128 * 1024), transport(crypto, num_threads), orb(&transport, &thread_pool) {} + : thread_pool(128 * 1024), transport(crypto, num_threads), orb(&transport) {} void start() { ASSERT_TRUE(transport.Start(&thread_pool)); } diff --git a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp index 0e5f4712e61..cdb2636a8c1 100644 --- a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp +++ b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp @@ -21,18 +21,19 @@ CryptoEngine::SP tls_crypto = std::make_shared<vespalib::TlsCryptoEngine>(vespal TT_Tag req_tag("request"); struct Fixture : FRT_Invokable { - FRT_Supervisor orb; - Fixture(CryptoEngine::SP crypto) : orb(std::move(crypto)) { - ASSERT_TRUE(orb.Listen(0)); + fnet::frt::StandaloneFRT server; + FRT_Supervisor & orb; + Fixture(CryptoEngine::SP crypto) + : server(std::move(crypto)), + orb(server.supervisor()) + { + ASSERT_TRUE(orb.Listen(0)); init_rpc(); - ASSERT_TRUE(orb.Start()); } FRT_Target *connect() { return orb.GetTarget(orb.GetListenPort()); } - ~Fixture() { - orb.ShutDown(true); - } + ~Fixture() = default; void init_rpc() { FRT_ReflectionBuilder rb(&orb); rb.DefineMethod("inc", "l", "l", FRT_METHOD(Fixture::rpc_inc), this); diff --git a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp index ab21c62bb68..43a61cd9bcd 100644 --- a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp +++ b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp @@ -39,13 +39,13 @@ struct Server : public FRT_Invokable TEST("detach return invoke") { Receptor receptor; - FRT_Supervisor orb; - Server server(orb, receptor); - ASSERT_TRUE(orb.Listen(0)); - ASSERT_TRUE(orb.Start()); - std::string spec = vespalib::make_string("tcp/localhost:%d", orb.GetListenPort()); - FRT_Target *target = orb.Get2WayTarget(spec.c_str()); - FRT_RPCRequest *req = orb.AllocRPCRequest(); + fnet::frt::StandaloneFRT frtServer; + FRT_Supervisor & supervisor = frtServer.supervisor(); + Server server(supervisor, receptor); + ASSERT_TRUE(supervisor.Listen(0)); + std::string spec = vespalib::make_string("tcp/localhost:%d", supervisor.GetListenPort()); + FRT_Target *target = supervisor.Get2WayTarget(spec.c_str()); + FRT_RPCRequest *req = supervisor.AllocRPCRequest(); req->SetMethodName("hook"); target->InvokeSync(req, 5.0); @@ -58,7 +58,6 @@ TEST("detach return invoke") { } req->SubRef(); target->SubRef(); - orb.ShutDown(true); if (receptor.req != 0) { EXPECT_TRUE(!receptor.req->IsError()); receptor.req->SubRef(); diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp index a84e3049704..410a60fa08a 100644 --- a/fnet/src/tests/frt/rpc/invoke.cpp +++ b/fnet/src/tests/frt/rpc/invoke.cpp @@ -249,8 +249,8 @@ public: class Fixture { private: - FRT_Supervisor _client; - FRT_Supervisor _server; + fnet::frt::StandaloneFRT _client; + fnet::frt::StandaloneFRT _server; vespalib::string _peerSpec; FRT_Target *_target; TestRPC _testRPC; @@ -258,7 +258,7 @@ private: public: FRT_Target &target() { return *_target; } - FRT_Target *make_bad_target() { return _client.GetTarget("bogus address"); } + FRT_Target *make_bad_target() { return _client.supervisor().GetTarget("bogus address"); } RequestLatch &detached_req() { return _testRPC.detached_req(); } EchoTest &echo() { return _echoTest; } @@ -267,16 +267,14 @@ public: _server(crypto), _peerSpec(), _target(nullptr), - _testRPC(&_server), - _echoTest(&_server) + _testRPC(&_server.supervisor()), + _echoTest(&_server.supervisor()) { - _client.GetTransport()->SetTCPNoDelay(true); - _server.GetTransport()->SetTCPNoDelay(true); - ASSERT_TRUE(_server.Listen("tcp/0")); - ASSERT_TRUE(_server.Start()); - ASSERT_TRUE(_client.Start()); - _peerSpec = SocketSpec::from_host_port("localhost", _server.GetListenPort()).spec(); - _target = _client.GetTarget(_peerSpec.c_str()); + _client.supervisor().GetTransport()->SetTCPNoDelay(true); + _server.supervisor().GetTransport()->SetTCPNoDelay(true); + ASSERT_TRUE(_server.supervisor().Listen("tcp/0")); + _peerSpec = SocketSpec::from_host_port("localhost", _server.supervisor().GetListenPort()).spec(); + _target = _client.supervisor().GetTarget(_peerSpec.c_str()); //--------------------------------------------------------------------- MyReq req("frt.rpc.ping"); target().InvokeSync(req.borrow(), timeout); @@ -284,8 +282,6 @@ public: } ~Fixture() { - _client.ShutDown(true); - _server.ShutDown(true); _target->SubRef(); } }; diff --git a/fnet/src/tests/frt/rpc/session.cpp b/fnet/src/tests/frt/rpc/session.cpp index 96d20ae9c18..24cbedb3ff7 100644 --- a/fnet/src/tests/frt/rpc/session.cpp +++ b/fnet/src/tests/frt/rpc/session.cpp @@ -86,14 +86,13 @@ struct RPC : public FRT_Invokable } }; -TEST("session") { - RPC rpc; - FRT_Supervisor orb(crypto); +void testSession(RPC & rpc) { + fnet::frt::StandaloneFRT frt(crypto); + FRT_Supervisor & orb = frt.supervisor(); char spec[64]; rpc.Init(&orb); ASSERT_TRUE(orb.Listen("tcp/0")); sprintf(spec, "tcp/localhost:%d", orb.GetListenPort()); - ASSERT_TRUE(orb.Start()); FRT_Target *target = orb.GetTarget(spec); FRT_RPCRequest *req = orb.AllocRPCRequest(); @@ -122,7 +121,10 @@ TEST("session") { req->SubRef(); target->SubRef(); - orb.ShutDown(true); +} +TEST("session") { + RPC rpc; + testSession(rpc); EXPECT_TRUE(Session::GetCnt() == 0); EXPECT_TRUE(!rpc.bogusFini); }; diff --git a/fnet/src/tests/frt/rpc/sharedblob.cpp b/fnet/src/tests/frt/rpc/sharedblob.cpp index a48ecbb1da7..1c0503454c7 100644 --- a/fnet/src/tests/frt/rpc/sharedblob.cpp +++ b/fnet/src/tests/frt/rpc/sharedblob.cpp @@ -118,7 +118,8 @@ struct ServerSampler : public FRT_Invokable }; TEST("testExplicitShared") { - FRT_Supervisor orb; + fnet::frt::StandaloneFRT frt; + FRT_Supervisor & orb = frt.supervisor(); MyBlob blob; FRT_RPCRequest *req = orb.AllocRPCRequest(); @@ -171,7 +172,8 @@ TEST("testExplicitShared") { TEST("testImplicitShared") { DataSet dataSet; - FRT_Supervisor orb; + fnet::frt::StandaloneFRT frt; + FRT_Supervisor & orb = frt.supervisor(); FRT_RPCRequest *req = orb.AllocRPCRequest(); ServerSampler serverSampler(dataSet, req); { @@ -182,7 +184,6 @@ TEST("testImplicitShared") { orb.Listen(0); int port = orb.GetListenPort(); ASSERT_TRUE(port != 0); - orb.Start(); char tmp[64]; snprintf(tmp, sizeof(tmp), "tcp/localhost:%d", port); @@ -255,7 +256,6 @@ TEST("testImplicitShared") { } req->SubRef(); target->SubRef(); - orb.ShutDown(true); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp index f76e66c2af6..b91b9fa4b39 100644 --- a/fnet/src/tests/info/info.cpp +++ b/fnet/src/tests/info/info.cpp @@ -37,12 +37,12 @@ struct RPC : public FRT_Invokable TEST("info") { RPC rpc; - FRT_Supervisor orb; + fnet::frt::StandaloneFRT server; + FRT_Supervisor & orb = server.supervisor(); char spec[64]; rpc.Init(&orb); ASSERT_TRUE(orb.Listen("tcp/0")); sprintf(spec, "tcp/localhost:%d", orb.GetListenPort()); - ASSERT_TRUE(orb.Start()); FRT_Target *target = orb.GetTarget(spec); FRT_RPCRequest *local_info = orb.AllocRPCRequest(); @@ -65,7 +65,6 @@ TEST("info") { target->SubRef(); local_info->SubRef(); remote_info->SubRef(); - orb.ShutDown(true); }; TEST("size of important objects") diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp index e509223c005..d5c7e0847f7 100644 --- a/fnet/src/vespa/fnet/frt/supervisor.cpp +++ b/fnet/src/vespa/fnet/frt/supervisor.cpp @@ -9,57 +9,25 @@ #include <vespa/fnet/connector.h> #include <vespa/fastos/thread.h> -FRT_Supervisor::FRT_Supervisor(FNET_Transport *transport, - FastOS_ThreadPool *threadPool) +FRT_Supervisor::FRT_Supervisor(FNET_Transport *transport) : _transport(transport), - _threadPool(threadPool), - _standAlone(false), _packetFactory(), _packetStreamer(&_packetFactory), _connector(nullptr), _reflectionManager(), _rpcHooks(&_reflectionManager), _connHooks(*this), - _methodMismatchHook(nullptr) + _methodMismatchHook() { _rpcHooks.InitRPC(this); } -FRT_Supervisor::FRT_Supervisor(vespalib::CryptoEngine::SP crypto, - uint32_t threadStackSize, - uint32_t maxThreads) - : _transport(nullptr), - _threadPool(nullptr), - _standAlone(true), - _packetFactory(), - _packetStreamer(&_packetFactory), - _connector(nullptr), - _reflectionManager(), - _rpcHooks(&_reflectionManager), - _connHooks(*this), - _methodMismatchHook(nullptr) -{ - _transport = new FNET_Transport(std::move(crypto), 1); - assert(_transport != nullptr); - if (threadStackSize > 0) { - _threadPool = new FastOS_ThreadPool(threadStackSize, maxThreads); - assert(_threadPool != nullptr); - } - _rpcHooks.InitRPC(this); -} - - FRT_Supervisor::~FRT_Supervisor() { - if (_standAlone) { - delete _transport; - delete _threadPool; - } if (_connector != nullptr) { _connector->SubRef(); } - delete _methodMismatchHook; } FNET_Scheduler * @@ -134,43 +102,35 @@ FRT_Supervisor::AllocRPCRequest(FRT_RPCRequest *tradein) void -FRT_Supervisor::SetSessionInitHook(FRT_METHOD_PT method, - FRT_Invokable *handler) +FRT_Supervisor::SetSessionInitHook(FRT_METHOD_PT method, FRT_Invokable *handler) { _connHooks.SetSessionInitHook(method, handler); } void -FRT_Supervisor::SetSessionDownHook(FRT_METHOD_PT method, - FRT_Invokable *handler) +FRT_Supervisor::SetSessionDownHook(FRT_METHOD_PT method, FRT_Invokable *handler) { _connHooks.SetSessionDownHook(method, handler); } void -FRT_Supervisor::SetSessionFiniHook(FRT_METHOD_PT method, - FRT_Invokable *handler) +FRT_Supervisor::SetSessionFiniHook(FRT_METHOD_PT method, FRT_Invokable *handler) { _connHooks.SetSessionFiniHook(method, handler); } void -FRT_Supervisor::SetMethodMismatchHook(FRT_METHOD_PT method, - FRT_Invokable *handler) +FRT_Supervisor::SetMethodMismatchHook(FRT_METHOD_PT method, FRT_Invokable *handler) { - delete _methodMismatchHook; - _methodMismatchHook = new FRT_Method("frt.hook.methodMismatch", "*", "*", - method, handler); - assert(_methodMismatchHook != nullptr); + _methodMismatchHook = std::make_unique<FRT_Method>("frt.hook.methodMismatch", "*", "*", method, handler); } void -FRT_Supervisor::InvokeVoid(FNET_Connection *conn, - FRT_RPCRequest *req) +FRT_Supervisor::InvokeVoid(FNET_Connection *conn, FRT_RPCRequest *req) { if (conn != nullptr) { FNET_Channel *ch = conn->OpenChannel(); @@ -183,11 +143,7 @@ FRT_Supervisor::InvokeVoid(FNET_Connection *conn, void -FRT_Supervisor::InvokeAsync(SchedulerPtr scheduler, - FNET_Connection *conn, - FRT_RPCRequest *req, - double timeout, - FRT_IRequestWait *waiter) +FRT_Supervisor::InvokeAsync(SchedulerPtr scheduler, FNET_Connection *conn, FRT_RPCRequest *req, double timeout, FRT_IRequestWait *waiter) { uint32_t chid; FNET_Packet *packet = req->CreateRequestPacket(true); @@ -209,10 +165,7 @@ FRT_Supervisor::InvokeAsync(SchedulerPtr scheduler, void -FRT_Supervisor::InvokeSync(SchedulerPtr scheduler, - FNET_Connection *conn, - FRT_RPCRequest *req, - double timeout) +FRT_Supervisor::InvokeSync(SchedulerPtr scheduler, FNET_Connection *conn, FRT_RPCRequest *req, double timeout) { FRT_SingleReqWait waiter; InvokeAsync(scheduler, conn, req, timeout, &waiter); @@ -250,7 +203,7 @@ FNET_IPacketHandler::HP_RetCode FRT_Supervisor::HandlePacket(FNET_Packet *packet, FNET_Context context) { uint32_t pcode = packet->GetPCODE() & 0xffff; // remove flags - FRT_RPCRequest *req = (FRT_RPCRequest *) context._value.VOIDP; + auto *req = (FRT_RPCRequest *) context._value.VOIDP; FRT_RPCInvoker *invoker = nullptr; bool noReply = false; @@ -265,9 +218,9 @@ FRT_Supervisor::HandlePacket(FNET_Packet *packet, FNET_Context context) if (req->IsError()) { if (req->GetErrorCode() != FRTE_RPC_BAD_REQUEST - && _methodMismatchHook != nullptr) + && _methodMismatchHook) { - invoker->ForceMethod(_methodMismatchHook); + invoker->ForceMethod(_methodMismatchHook.get()); return (invoker->Invoke()) ? FNET_FREE_CHANNEL : FNET_CLOSE_CHANNEL; } @@ -282,40 +235,6 @@ FRT_Supervisor::HandlePacket(FNET_Packet *packet, FNET_Context context) } } - -bool -FRT_Supervisor::Start() -{ - assert(_standAlone); - if (_threadPool == nullptr) - return false; - return _transport->Start(_threadPool); -} - - -void -FRT_Supervisor::Main() -{ - assert(_standAlone); - _transport->Main(); -} - - -void -FRT_Supervisor::ShutDown(bool waitFinished) -{ - assert(_standAlone); - _transport->ShutDown(waitFinished); -} - - -void -FRT_Supervisor::WaitFinished() -{ - assert(_standAlone); - _transport->WaitFinished(); -} - //---------------------------------------------------- // RPC Hooks //---------------------------------------------------- @@ -403,57 +322,38 @@ FRT_Supervisor::RPCHooks::RPC_GetMethodInfo(FRT_RPCRequest *req) FRT_Supervisor::ConnHooks::ConnHooks(FRT_Supervisor &parent) : _parent(parent), - _sessionInitHook(nullptr), - _sessionDownHook(nullptr), - _sessionFiniHook(nullptr) + _sessionInitHook(), + _sessionDownHook(), + _sessionFiniHook() { } -FRT_Supervisor::ConnHooks::~ConnHooks() -{ - delete _sessionInitHook; - delete _sessionDownHook; - delete _sessionFiniHook; -} - +FRT_Supervisor::ConnHooks::~ConnHooks() = default; void -FRT_Supervisor::ConnHooks::SetSessionInitHook(FRT_METHOD_PT method, - FRT_Invokable *handler) +FRT_Supervisor::ConnHooks::SetSessionInitHook(FRT_METHOD_PT method, FRT_Invokable *handler) { - delete _sessionInitHook; - _sessionInitHook = new FRT_Method("frt.hook.sessionInit", "", "", - method, handler); - assert(_sessionInitHook != nullptr); + _sessionInitHook = std::make_unique<FRT_Method>("frt.hook.sessionInit", "", "", method, handler); } void -FRT_Supervisor::ConnHooks::SetSessionDownHook(FRT_METHOD_PT method, - FRT_Invokable *handler) +FRT_Supervisor::ConnHooks::SetSessionDownHook(FRT_METHOD_PT method, FRT_Invokable *handler) { - delete _sessionDownHook; - _sessionDownHook = new FRT_Method("frt.hook.sessionDown", "", "", - method, handler); - assert(_sessionDownHook != nullptr); + _sessionDownHook = std::make_unique<FRT_Method>("frt.hook.sessionDown", "", "", method, handler); } void -FRT_Supervisor::ConnHooks::SetSessionFiniHook(FRT_METHOD_PT method, - FRT_Invokable *handler) +FRT_Supervisor::ConnHooks::SetSessionFiniHook(FRT_METHOD_PT method, FRT_Invokable *handler) { - delete _sessionFiniHook; - _sessionFiniHook = new FRT_Method("frt.hook.sessionFini", "", "", - method, handler); - assert(_sessionFiniHook != nullptr); + _sessionFiniHook = std::make_unique<FRT_Method>("frt.hook.sessionFini", "", "", method, handler); } void -FRT_Supervisor::ConnHooks::InvokeHook(FRT_Method *hook, - FNET_Connection *conn) +FRT_Supervisor::ConnHooks::InvokeHook(FRT_Method *hook, FNET_Connection *conn) { FRT_RPCRequest *req = _parent.AllocRPCRequest(); req->SetMethodName(hook->GetName()); @@ -466,8 +366,8 @@ FRT_Supervisor::ConnHooks::InitAdminChannel(FNET_Channel *channel) { FNET_Connection *conn = channel->GetConnection(); conn->SetCleanupHandler(this); - if (_sessionInitHook != nullptr) { - InvokeHook(_sessionInitHook, conn); + if (_sessionInitHook) { + InvokeHook(_sessionInitHook.get(), conn); } channel->SetHandler(this); channel->SetContext(channel); @@ -476,16 +376,15 @@ FRT_Supervisor::ConnHooks::InitAdminChannel(FNET_Channel *channel) FNET_IPacketHandler::HP_RetCode -FRT_Supervisor::ConnHooks::HandlePacket(FNET_Packet *packet, - FNET_Context context) +FRT_Supervisor::ConnHooks::HandlePacket(FNET_Packet *packet, FNET_Context context) { if (!packet->IsChannelLostCMD()) { packet->Free(); return FNET_KEEP_CHANNEL; } FNET_Channel *ch = context._value.CHANNEL; - if (_sessionDownHook != nullptr) { - InvokeHook(_sessionDownHook, ch->GetConnection()); + if (_sessionDownHook) { + InvokeHook(_sessionDownHook.get(), ch->GetConnection()); } return FNET_FREE_CHANNEL; } @@ -494,8 +393,8 @@ FRT_Supervisor::ConnHooks::HandlePacket(FNET_Packet *packet, void FRT_Supervisor::ConnHooks::Cleanup(FNET_Connection *conn) { - if (_sessionFiniHook != nullptr) { - InvokeHook(_sessionFiniHook, conn); + if (_sessionFiniHook) { + InvokeHook(_sessionFiniHook.get(), conn); } } @@ -506,3 +405,33 @@ FRT_Supervisor::SchedulerPtr::SchedulerPtr(FNET_Transport *transport) FRT_Supervisor::SchedulerPtr::SchedulerPtr(FNET_TransportThread *transport_thread) : ptr(transport_thread->GetScheduler()) { } + +namespace fnet::frt { + +StandaloneFRT::StandaloneFRT() + : _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)), + _transport(std::make_unique<FNET_Transport>()), + _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())) +{ + _transport->Start(_threadPool.get()); +} + +StandaloneFRT::StandaloneFRT(vespalib::CryptoEngine::SP crypto) + : _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)), + _transport(std::make_unique<FNET_Transport>(std::move(crypto), 1)), + _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())) +{ + _transport->Start(_threadPool.get()); +} + +StandaloneFRT::~StandaloneFRT() +{ + _transport->ShutDown(true); +} + +void +StandaloneFRT::wait_finished() const { + _transport->WaitFinished(); +} + +} diff --git a/fnet/src/vespa/fnet/frt/supervisor.h b/fnet/src/vespa/fnet/frt/supervisor.h index dc7fb496239..1247cb08402 100644 --- a/fnet/src/vespa/fnet/frt/supervisor.h +++ b/fnet/src/vespa/fnet/frt/supervisor.h @@ -9,7 +9,6 @@ #include <vespa/fnet/ipackethandler.h> #include <vespa/fnet/connection.h> #include <vespa/fnet/simplepacketstreamer.h> -#include <vespa/vespalib/net/crypto_engine.h> class FNET_Transport; class FRT_Target; @@ -18,6 +17,9 @@ class FNET_Scheduler; class FRT_RPCInvoker; class FRT_IRequestWait; +namespace vespalib { class CryptoEngine; } + + class FRT_Supervisor : public FNET_IServerAdapter, public FNET_IPacketHandler { @@ -26,12 +28,10 @@ public: { private: FRT_ReflectionManager *_reflectionManager; - - RPCHooks(const RPCHooks &); - RPCHooks &operator=(const RPCHooks &); - public: - RPCHooks(FRT_ReflectionManager *reflect) + RPCHooks(const RPCHooks &) = delete; + RPCHooks &operator=(const RPCHooks &) = delete; + explicit RPCHooks(FRT_ReflectionManager *reflect) : _reflectionManager(reflect) {} void InitRPC(FRT_Supervisor *supervisor); @@ -45,17 +45,15 @@ public: public FNET_IPacketHandler { private: - FRT_Supervisor &_parent; - FRT_Method *_sessionInitHook; - FRT_Method *_sessionDownHook; - FRT_Method *_sessionFiniHook; - - ConnHooks(const ConnHooks &); - ConnHooks &operator=(const ConnHooks &); - + FRT_Supervisor &_parent; + std::unique_ptr<FRT_Method> _sessionInitHook; + std::unique_ptr<FRT_Method> _sessionDownHook; + std::unique_ptr<FRT_Method> _sessionFiniHook; public: - ConnHooks(FRT_Supervisor &parent); - ~ConnHooks(); + ConnHooks(const ConnHooks &) = delete; + ConnHooks &operator=(const ConnHooks &) = delete; + explicit ConnHooks(FRT_Supervisor &parent); + ~ConnHooks() override; void SetSessionInitHook(FRT_METHOD_PT method, FRT_Invokable *handler); void SetSessionDownHook(FRT_METHOD_PT method, FRT_Invokable *handler); @@ -67,32 +65,23 @@ public: }; private: - FNET_Transport *_transport; - FastOS_ThreadPool *_threadPool; - bool _standAlone; - - FRT_PacketFactory _packetFactory; - FNET_SimplePacketStreamer _packetStreamer; - FNET_Connector *_connector; - FRT_ReflectionManager _reflectionManager; - RPCHooks _rpcHooks; - ConnHooks _connHooks; - FRT_Method *_methodMismatchHook; - - FRT_Supervisor(const FRT_Supervisor &); - FRT_Supervisor &operator=(const FRT_Supervisor &); + FNET_Transport *_transport; + FRT_PacketFactory _packetFactory; + FNET_SimplePacketStreamer _packetStreamer; + FNET_Connector *_connector; + FRT_ReflectionManager _reflectionManager; + RPCHooks _rpcHooks; + ConnHooks _connHooks; + std::unique_ptr<FRT_Method> _methodMismatchHook; public: - FRT_Supervisor(FNET_Transport *transport, FastOS_ThreadPool *threadPool); - FRT_Supervisor(vespalib::CryptoEngine::SP crypto, uint32_t threadStackSize = 65000, uint32_t maxThreads = 0); - FRT_Supervisor(uint32_t threadStackSize = 65000, uint32_t maxThreads = 0) - : FRT_Supervisor(vespalib::CryptoEngine::get_default(), threadStackSize, maxThreads) {} - virtual ~FRT_Supervisor(); + explicit FRT_Supervisor(FNET_Transport *transport); + FRT_Supervisor(const FRT_Supervisor &) = delete; + FRT_Supervisor &operator=(const FRT_Supervisor &) = delete; + ~FRT_Supervisor() override; - bool StandAlone() { return _standAlone; } FNET_Transport *GetTransport() { return _transport; } FNET_Scheduler *GetScheduler(); - FastOS_ThreadPool *GetThreadPool() { return _threadPool; } FRT_ReflectionManager *GetReflectionManager() { return &_reflectionManager; } bool Listen(const char *spec); @@ -100,8 +89,7 @@ public: uint32_t GetListenPort() const; FRT_Target *GetTarget(const char *spec); - FRT_Target *Get2WayTarget(const char *spec, - FNET_Context connContext = FNET_Context()); + FRT_Target *Get2WayTarget(const char *spec, FNET_Context connContext = FNET_Context()); FRT_Target *GetTarget(int port); FRT_RPCRequest *AllocRPCRequest(FRT_RPCRequest *tradein = nullptr); @@ -120,17 +108,10 @@ public: }; // methods for performing rpc invocations - static void InvokeVoid(FNET_Connection *conn, - FRT_RPCRequest *req); - static void InvokeAsync(SchedulerPtr scheduler, - FNET_Connection *conn, - FRT_RPCRequest *req, - double timeout, - FRT_IRequestWait *waiter); - static void InvokeSync(SchedulerPtr scheduler, - FNET_Connection *conn, - FRT_RPCRequest *req, - double timeout); + static void InvokeVoid(FNET_Connection *conn, FRT_RPCRequest *req); + static void InvokeSync(SchedulerPtr scheduler, FNET_Connection *conn, FRT_RPCRequest *req, double timeout); + static void InvokeAsync(SchedulerPtr scheduler, FNET_Connection *conn, FRT_RPCRequest *req, double timeout, FRT_IRequestWait *waiter); + // FNET ServerAdapter Interface bool InitAdminChannel(FNET_Channel *channel) override; @@ -138,11 +119,26 @@ public: // Packet Handling HP_RetCode HandlePacket(FNET_Packet *packet, FNET_Context context) override; +}; - // Methods for controlling transport object in standalone mode - bool Start(); - void Main(); - void ShutDown(bool waitFinished); - void WaitFinished(); +namespace fnet::frt { + +/** + * This is a simple class that makes it easy to test RPC. + * Normally you do not want use it in production code as it hides your possibilites and responsibilities. + */ +class StandaloneFRT { +public: + StandaloneFRT(); + explicit StandaloneFRT(std::shared_ptr<vespalib::CryptoEngine> crypto); + ~StandaloneFRT(); + FRT_Supervisor & supervisor() { return *_supervisor; } + // TODO Remove this method as it is a relic from the ancient non-threaded world. + void wait_finished() const; +private: + std::unique_ptr<FastOS_ThreadPool> _threadPool; + std::unique_ptr<FNET_Transport> _transport; + std::unique_ptr<FRT_Supervisor> _supervisor; }; +} |