summaryrefslogtreecommitdiffstats
path: root/fnet
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-05-10 11:25:48 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2019-05-10 12:25:38 +0000
commit4412aace869986be3a1060f78f367841353d3384 (patch)
treef4b5e1f6da5eaf1563f3b2fd64779800acfd5796 /fnet
parent840d4e0578dc627b75bcd0050f1b253e84cc30ed (diff)
Simplify the supervisor responsibility
Diffstat (limited to 'fnet')
-rw-r--r--fnet/src/examples/frt/rpc/echo_client.cpp5
-rw-r--r--fnet/src/examples/frt/rpc/rpc_callback_client.cpp5
-rw-r--r--fnet/src/examples/frt/rpc/rpc_callback_server.cpp11
-rw-r--r--fnet/src/examples/frt/rpc/rpc_client.cpp5
-rw-r--r--fnet/src/examples/frt/rpc/rpc_info.cpp7
-rw-r--r--fnet/src/examples/frt/rpc/rpc_invoke.cpp5
-rw-r--r--fnet/src/examples/frt/rpc/rpc_proxy.cpp8
-rw-r--r--fnet/src/examples/frt/rpc/rpc_server.cpp6
-rw-r--r--fnet/src/tests/frt/method_pt/method_pt.cpp10
-rw-r--r--fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp2
-rw-r--r--fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp15
-rw-r--r--fnet/src/tests/frt/rpc/detach_return_invoke.cpp15
-rw-r--r--fnet/src/tests/frt/rpc/invoke.cpp24
-rw-r--r--fnet/src/tests/frt/rpc/session.cpp12
-rw-r--r--fnet/src/tests/frt/rpc/sharedblob.cpp8
-rw-r--r--fnet/src/tests/info/info.cpp5
-rw-r--r--fnet/src/vespa/fnet/frt/supervisor.cpp193
-rw-r--r--fnet/src/vespa/fnet/frt/supervisor.h106
18 files changed, 179 insertions, 263 deletions
diff --git a/fnet/src/examples/frt/rpc/echo_client.cpp b/fnet/src/examples/frt/rpc/echo_client.cpp
index 9d73d38cd1e..cc406224135 100644
--- a/fnet/src/examples/frt/rpc/echo_client.cpp
+++ b/fnet/src/examples/frt/rpc/echo_client.cpp
@@ -19,9 +19,9 @@ EchoClient::Main()
printf("usage : echo_client <connectspec>\n");
return 1;
}
- FRT_Supervisor supervisor;
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & supervisor = server.supervisor();
- supervisor.Start();
FRT_Target *target = supervisor.GetTarget(_argv[1]);
FRT_RPCRequest *req = supervisor.AllocRPCRequest();
FRT_Values *args = req->GetParams();
@@ -84,7 +84,6 @@ EchoClient::Main()
printf("Return values != parameters.\n");
}
req->SubRef();
- supervisor.ShutDown(true);
return 0;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp
index 7c6434e870a..726a500cc55 100644
--- a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp
@@ -47,9 +47,9 @@ MyApp::Main()
}
bool ok = true;
RPC rpc;
- FRT_Supervisor orb;
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & orb = server.supervisor();
rpc.Init(&orb);
- orb.Start();
FRT_Target *target = orb.Get2WayTarget(_argv[1]);
FRT_RPCRequest *req = orb.AllocRPCRequest();
@@ -102,7 +102,6 @@ MyApp::Main()
req->SubRef();
target->SubRef();
- orb.ShutDown(true);
return ok ? 0 : 1;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
index ac7b34ebda0..7e0caaba00d 100644
--- a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
@@ -60,11 +60,12 @@ MyApp::Main()
return 1;
}
RPC rpc;
- FRT_Supervisor orb;
- rpc.Init(&orb);
- orb.Listen(_argv[1]);
- FNET_SignalShutDown ssd(*orb.GetTransport());
- orb.Main();
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & supervisor = server.supervisor();
+ rpc.Init(&supervisor);
+ supervisor.Listen(_argv[1]);
+ FNET_SignalShutDown ssd(*supervisor.GetTransport());
+ server.wait_finished();
return 0;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_client.cpp b/fnet/src/examples/frt/rpc/rpc_client.cpp
index cc230d2bc7c..fc1d54d3440 100644
--- a/fnet/src/examples/frt/rpc/rpc_client.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_client.cpp
@@ -19,9 +19,9 @@ RPCClient::Main()
printf("usage : rpc_client <connectspec>\n");
return 1;
}
- FRT_Supervisor supervisor;
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & supervisor = server.supervisor();
- supervisor.Start();
FRT_Target *target = supervisor.GetTarget(_argv[1]);
const char *str1 = "abc";
@@ -80,7 +80,6 @@ RPCClient::Main()
req->SubRef();
target->SubRef();
- supervisor.ShutDown(true);
return 0;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_info.cpp b/fnet/src/examples/frt/rpc/rpc_info.cpp
index 5f17e69a10d..d90d22d1986 100644
--- a/fnet/src/examples/frt/rpc/rpc_info.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_info.cpp
@@ -79,11 +79,12 @@ RPCInfo::Main()
}
bool verbose = (_argc > 2 && strcmp(_argv[2], "verbose") == 0);
- FRT_Supervisor supervisor;
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & supervisor = server.supervisor();
+
FRT_Target *target = supervisor.GetTarget(_argv[1]);
FRT_RPCRequest *m_list = nullptr;
FRT_RPCRequest *info = nullptr;
- supervisor.Start();
GetReq(&info, &supervisor);
info->SetMethodName("frt.rpc.ping");
@@ -91,7 +92,6 @@ RPCInfo::Main()
if (info->IsError()) {
fprintf(stderr, "Error talking to %s\n", _argv[1]);
FreeReqs(m_list, info);
- supervisor.ShutDown(true);
return 1;
}
@@ -129,7 +129,6 @@ RPCInfo::Main()
}
FreeReqs(m_list, info);
target->SubRef();
- supervisor.ShutDown(true);
return 0;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_invoke.cpp b/fnet/src/examples/frt/rpc/rpc_invoke.cpp
index ea34ffa6905..fb82622a537 100644
--- a/fnet/src/examples/frt/rpc/rpc_invoke.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_invoke.cpp
@@ -71,8 +71,8 @@ int
RPCClient::run()
{
int retCode = 0;
- FRT_Supervisor supervisor;
- supervisor.Start();
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & supervisor = server.supervisor();
int targetArg = 1;
int methNameArg = 2;
int startOfArgs = 3;
@@ -109,7 +109,6 @@ RPCClient::run()
}
req->SubRef();
target->SubRef();
- supervisor.ShutDown(true);
return retCode;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_proxy.cpp b/fnet/src/examples/frt/rpc/rpc_proxy.cpp
index 496190b2b80..fa152dcec10 100644
--- a/fnet/src/examples/frt/rpc/rpc_proxy.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_proxy.cpp
@@ -224,18 +224,18 @@ App::Main()
}
bool verbose = (_argc > 3) && (strcmp(_argv[3], "verbose") == 0);
- FRT_Supervisor supervisor;
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & supervisor = server.supervisor();
RPCProxy proxy(supervisor, _argv[2], verbose);
supervisor.GetReflectionManager()->Reset();
supervisor.SetSessionInitHook(FRT_METHOD(RPCProxy::HOOK_Init), &proxy);
supervisor.SetSessionDownHook(FRT_METHOD(RPCProxy::HOOK_Down), &proxy);
supervisor.SetSessionFiniHook(FRT_METHOD(RPCProxy::HOOK_Fini), &proxy);
- supervisor.SetMethodMismatchHook(FRT_METHOD(RPCProxy::HOOK_Mismatch),
- &proxy);
+ supervisor.SetMethodMismatchHook(FRT_METHOD(RPCProxy::HOOK_Mismatch), &proxy);
supervisor.Listen(_argv[1]);
FNET_SignalShutDown ssd(*supervisor.GetTransport());
- supervisor.Main();
+ server.wait_finished();
return 0;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_server.cpp b/fnet/src/examples/frt/rpc/rpc_server.cpp
index 03d618133c9..cc3972166ad 100644
--- a/fnet/src/examples/frt/rpc/rpc_server.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_server.cpp
@@ -91,12 +91,12 @@ RPCServer::Main(int argc, char **argv)
return 1;
}
- _supervisor = new FRT_Supervisor();
+ fnet::frt::StandaloneFRT server;
+ _supervisor = &server.supervisor();
InitRPC(_supervisor);
_supervisor->Listen(argv[1]);
FNET_SignalShutDown ssd(*_supervisor->GetTransport());
- _supervisor->Main();
- delete _supervisor;
+ server.wait_finished();
return 0;
}
diff --git a/fnet/src/tests/frt/method_pt/method_pt.cpp b/fnet/src/tests/frt/method_pt/method_pt.cpp
index 5417fddceeb..608a435bd1d 100644
--- a/fnet/src/tests/frt/method_pt/method_pt.cpp
+++ b/fnet/src/tests/frt/method_pt/method_pt.cpp
@@ -18,6 +18,7 @@ class ComplexHandler3;
Test *_test;
+std::unique_ptr<fnet::frt::StandaloneFRT> _server;
FRT_Supervisor *_supervisor;
FRT_Target *_target;
SimpleHandler *_simpleHandler;
@@ -176,7 +177,8 @@ public:
//-------------------------------------------------------------
void initTest() {
- _supervisor = new FRT_Supervisor();
+ _server = std::make_unique<fnet::frt::StandaloneFRT>();
+ _supervisor = &_server->supervisor();
_simpleHandler = new SimpleHandler();
_mediumHandler1 = new MediumHandler1();
_mediumHandler2 = new MediumHandler2();
@@ -200,9 +202,6 @@ void initTest() {
_target = _supervisor->GetTarget(spec.c_str());
ASSERT_TRUE(_target != nullptr);
- bool startOK = _supervisor->Start();
- ASSERT_TRUE(startOK);
-
FRT_ReflectionBuilder rb(_supervisor);
//-------------------------------------------------------------------
@@ -247,7 +246,6 @@ void initTest() {
void finiTest() {
- _supervisor->ShutDown(true);
delete _complexHandler1;
delete _complexHandler2;
delete _complexHandler3;
@@ -256,7 +254,7 @@ void finiTest() {
delete _mediumHandler3;
delete _simpleHandler;
_target->SubRef();
- delete _supervisor;
+ _server.reset();
}
diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
index 2441ea6eaa0..ed4911175a0 100644
--- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
+++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
@@ -15,7 +15,7 @@ struct Rpc : FRT_Invokable {
FNET_Transport transport;
FRT_Supervisor orb;
Rpc(CryptoEngine::SP crypto, size_t num_threads)
- : thread_pool(128 * 1024), transport(crypto, num_threads), orb(&transport, &thread_pool) {}
+ : thread_pool(128 * 1024), transport(crypto, num_threads), orb(&transport) {}
void start() {
ASSERT_TRUE(transport.Start(&thread_pool));
}
diff --git a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp
index 0e5f4712e61..cdb2636a8c1 100644
--- a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp
+++ b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp
@@ -21,18 +21,19 @@ CryptoEngine::SP tls_crypto = std::make_shared<vespalib::TlsCryptoEngine>(vespal
TT_Tag req_tag("request");
struct Fixture : FRT_Invokable {
- FRT_Supervisor orb;
- Fixture(CryptoEngine::SP crypto) : orb(std::move(crypto)) {
- ASSERT_TRUE(orb.Listen(0));
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & orb;
+ Fixture(CryptoEngine::SP crypto)
+ : server(std::move(crypto)),
+ orb(server.supervisor())
+ {
+ ASSERT_TRUE(orb.Listen(0));
init_rpc();
- ASSERT_TRUE(orb.Start());
}
FRT_Target *connect() {
return orb.GetTarget(orb.GetListenPort());
}
- ~Fixture() {
- orb.ShutDown(true);
- }
+ ~Fixture() = default;
void init_rpc() {
FRT_ReflectionBuilder rb(&orb);
rb.DefineMethod("inc", "l", "l", FRT_METHOD(Fixture::rpc_inc), this);
diff --git a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
index ab21c62bb68..43a61cd9bcd 100644
--- a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
+++ b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
@@ -39,13 +39,13 @@ struct Server : public FRT_Invokable
TEST("detach return invoke") {
Receptor receptor;
- FRT_Supervisor orb;
- Server server(orb, receptor);
- ASSERT_TRUE(orb.Listen(0));
- ASSERT_TRUE(orb.Start());
- std::string spec = vespalib::make_string("tcp/localhost:%d", orb.GetListenPort());
- FRT_Target *target = orb.Get2WayTarget(spec.c_str());
- FRT_RPCRequest *req = orb.AllocRPCRequest();
+ fnet::frt::StandaloneFRT frtServer;
+ FRT_Supervisor & supervisor = frtServer.supervisor();
+ Server server(supervisor, receptor);
+ ASSERT_TRUE(supervisor.Listen(0));
+ std::string spec = vespalib::make_string("tcp/localhost:%d", supervisor.GetListenPort());
+ FRT_Target *target = supervisor.Get2WayTarget(spec.c_str());
+ FRT_RPCRequest *req = supervisor.AllocRPCRequest();
req->SetMethodName("hook");
target->InvokeSync(req, 5.0);
@@ -58,7 +58,6 @@ TEST("detach return invoke") {
}
req->SubRef();
target->SubRef();
- orb.ShutDown(true);
if (receptor.req != 0) {
EXPECT_TRUE(!receptor.req->IsError());
receptor.req->SubRef();
diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp
index a84e3049704..410a60fa08a 100644
--- a/fnet/src/tests/frt/rpc/invoke.cpp
+++ b/fnet/src/tests/frt/rpc/invoke.cpp
@@ -249,8 +249,8 @@ public:
class Fixture
{
private:
- FRT_Supervisor _client;
- FRT_Supervisor _server;
+ fnet::frt::StandaloneFRT _client;
+ fnet::frt::StandaloneFRT _server;
vespalib::string _peerSpec;
FRT_Target *_target;
TestRPC _testRPC;
@@ -258,7 +258,7 @@ private:
public:
FRT_Target &target() { return *_target; }
- FRT_Target *make_bad_target() { return _client.GetTarget("bogus address"); }
+ FRT_Target *make_bad_target() { return _client.supervisor().GetTarget("bogus address"); }
RequestLatch &detached_req() { return _testRPC.detached_req(); }
EchoTest &echo() { return _echoTest; }
@@ -267,16 +267,14 @@ public:
_server(crypto),
_peerSpec(),
_target(nullptr),
- _testRPC(&_server),
- _echoTest(&_server)
+ _testRPC(&_server.supervisor()),
+ _echoTest(&_server.supervisor())
{
- _client.GetTransport()->SetTCPNoDelay(true);
- _server.GetTransport()->SetTCPNoDelay(true);
- ASSERT_TRUE(_server.Listen("tcp/0"));
- ASSERT_TRUE(_server.Start());
- ASSERT_TRUE(_client.Start());
- _peerSpec = SocketSpec::from_host_port("localhost", _server.GetListenPort()).spec();
- _target = _client.GetTarget(_peerSpec.c_str());
+ _client.supervisor().GetTransport()->SetTCPNoDelay(true);
+ _server.supervisor().GetTransport()->SetTCPNoDelay(true);
+ ASSERT_TRUE(_server.supervisor().Listen("tcp/0"));
+ _peerSpec = SocketSpec::from_host_port("localhost", _server.supervisor().GetListenPort()).spec();
+ _target = _client.supervisor().GetTarget(_peerSpec.c_str());
//---------------------------------------------------------------------
MyReq req("frt.rpc.ping");
target().InvokeSync(req.borrow(), timeout);
@@ -284,8 +282,6 @@ public:
}
~Fixture() {
- _client.ShutDown(true);
- _server.ShutDown(true);
_target->SubRef();
}
};
diff --git a/fnet/src/tests/frt/rpc/session.cpp b/fnet/src/tests/frt/rpc/session.cpp
index 96d20ae9c18..24cbedb3ff7 100644
--- a/fnet/src/tests/frt/rpc/session.cpp
+++ b/fnet/src/tests/frt/rpc/session.cpp
@@ -86,14 +86,13 @@ struct RPC : public FRT_Invokable
}
};
-TEST("session") {
- RPC rpc;
- FRT_Supervisor orb(crypto);
+void testSession(RPC & rpc) {
+ fnet::frt::StandaloneFRT frt(crypto);
+ FRT_Supervisor & orb = frt.supervisor();
char spec[64];
rpc.Init(&orb);
ASSERT_TRUE(orb.Listen("tcp/0"));
sprintf(spec, "tcp/localhost:%d", orb.GetListenPort());
- ASSERT_TRUE(orb.Start());
FRT_Target *target = orb.GetTarget(spec);
FRT_RPCRequest *req = orb.AllocRPCRequest();
@@ -122,7 +121,10 @@ TEST("session") {
req->SubRef();
target->SubRef();
- orb.ShutDown(true);
+}
+TEST("session") {
+ RPC rpc;
+ testSession(rpc);
EXPECT_TRUE(Session::GetCnt() == 0);
EXPECT_TRUE(!rpc.bogusFini);
};
diff --git a/fnet/src/tests/frt/rpc/sharedblob.cpp b/fnet/src/tests/frt/rpc/sharedblob.cpp
index a48ecbb1da7..1c0503454c7 100644
--- a/fnet/src/tests/frt/rpc/sharedblob.cpp
+++ b/fnet/src/tests/frt/rpc/sharedblob.cpp
@@ -118,7 +118,8 @@ struct ServerSampler : public FRT_Invokable
};
TEST("testExplicitShared") {
- FRT_Supervisor orb;
+ fnet::frt::StandaloneFRT frt;
+ FRT_Supervisor & orb = frt.supervisor();
MyBlob blob;
FRT_RPCRequest *req = orb.AllocRPCRequest();
@@ -171,7 +172,8 @@ TEST("testExplicitShared") {
TEST("testImplicitShared") {
DataSet dataSet;
- FRT_Supervisor orb;
+ fnet::frt::StandaloneFRT frt;
+ FRT_Supervisor & orb = frt.supervisor();
FRT_RPCRequest *req = orb.AllocRPCRequest();
ServerSampler serverSampler(dataSet, req);
{
@@ -182,7 +184,6 @@ TEST("testImplicitShared") {
orb.Listen(0);
int port = orb.GetListenPort();
ASSERT_TRUE(port != 0);
- orb.Start();
char tmp[64];
snprintf(tmp, sizeof(tmp), "tcp/localhost:%d", port);
@@ -255,7 +256,6 @@ TEST("testImplicitShared") {
}
req->SubRef();
target->SubRef();
- orb.ShutDown(true);
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp
index f76e66c2af6..b91b9fa4b39 100644
--- a/fnet/src/tests/info/info.cpp
+++ b/fnet/src/tests/info/info.cpp
@@ -37,12 +37,12 @@ struct RPC : public FRT_Invokable
TEST("info") {
RPC rpc;
- FRT_Supervisor orb;
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & orb = server.supervisor();
char spec[64];
rpc.Init(&orb);
ASSERT_TRUE(orb.Listen("tcp/0"));
sprintf(spec, "tcp/localhost:%d", orb.GetListenPort());
- ASSERT_TRUE(orb.Start());
FRT_Target *target = orb.GetTarget(spec);
FRT_RPCRequest *local_info = orb.AllocRPCRequest();
@@ -65,7 +65,6 @@ TEST("info") {
target->SubRef();
local_info->SubRef();
remote_info->SubRef();
- orb.ShutDown(true);
};
TEST("size of important objects")
diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp
index e509223c005..d5c7e0847f7 100644
--- a/fnet/src/vespa/fnet/frt/supervisor.cpp
+++ b/fnet/src/vespa/fnet/frt/supervisor.cpp
@@ -9,57 +9,25 @@
#include <vespa/fnet/connector.h>
#include <vespa/fastos/thread.h>
-FRT_Supervisor::FRT_Supervisor(FNET_Transport *transport,
- FastOS_ThreadPool *threadPool)
+FRT_Supervisor::FRT_Supervisor(FNET_Transport *transport)
: _transport(transport),
- _threadPool(threadPool),
- _standAlone(false),
_packetFactory(),
_packetStreamer(&_packetFactory),
_connector(nullptr),
_reflectionManager(),
_rpcHooks(&_reflectionManager),
_connHooks(*this),
- _methodMismatchHook(nullptr)
+ _methodMismatchHook()
{
_rpcHooks.InitRPC(this);
}
-FRT_Supervisor::FRT_Supervisor(vespalib::CryptoEngine::SP crypto,
- uint32_t threadStackSize,
- uint32_t maxThreads)
- : _transport(nullptr),
- _threadPool(nullptr),
- _standAlone(true),
- _packetFactory(),
- _packetStreamer(&_packetFactory),
- _connector(nullptr),
- _reflectionManager(),
- _rpcHooks(&_reflectionManager),
- _connHooks(*this),
- _methodMismatchHook(nullptr)
-{
- _transport = new FNET_Transport(std::move(crypto), 1);
- assert(_transport != nullptr);
- if (threadStackSize > 0) {
- _threadPool = new FastOS_ThreadPool(threadStackSize, maxThreads);
- assert(_threadPool != nullptr);
- }
- _rpcHooks.InitRPC(this);
-}
-
-
FRT_Supervisor::~FRT_Supervisor()
{
- if (_standAlone) {
- delete _transport;
- delete _threadPool;
- }
if (_connector != nullptr) {
_connector->SubRef();
}
- delete _methodMismatchHook;
}
FNET_Scheduler *
@@ -134,43 +102,35 @@ FRT_Supervisor::AllocRPCRequest(FRT_RPCRequest *tradein)
void
-FRT_Supervisor::SetSessionInitHook(FRT_METHOD_PT method,
- FRT_Invokable *handler)
+FRT_Supervisor::SetSessionInitHook(FRT_METHOD_PT method, FRT_Invokable *handler)
{
_connHooks.SetSessionInitHook(method, handler);
}
void
-FRT_Supervisor::SetSessionDownHook(FRT_METHOD_PT method,
- FRT_Invokable *handler)
+FRT_Supervisor::SetSessionDownHook(FRT_METHOD_PT method, FRT_Invokable *handler)
{
_connHooks.SetSessionDownHook(method, handler);
}
void
-FRT_Supervisor::SetSessionFiniHook(FRT_METHOD_PT method,
- FRT_Invokable *handler)
+FRT_Supervisor::SetSessionFiniHook(FRT_METHOD_PT method, FRT_Invokable *handler)
{
_connHooks.SetSessionFiniHook(method, handler);
}
void
-FRT_Supervisor::SetMethodMismatchHook(FRT_METHOD_PT method,
- FRT_Invokable *handler)
+FRT_Supervisor::SetMethodMismatchHook(FRT_METHOD_PT method, FRT_Invokable *handler)
{
- delete _methodMismatchHook;
- _methodMismatchHook = new FRT_Method("frt.hook.methodMismatch", "*", "*",
- method, handler);
- assert(_methodMismatchHook != nullptr);
+ _methodMismatchHook = std::make_unique<FRT_Method>("frt.hook.methodMismatch", "*", "*", method, handler);
}
void
-FRT_Supervisor::InvokeVoid(FNET_Connection *conn,
- FRT_RPCRequest *req)
+FRT_Supervisor::InvokeVoid(FNET_Connection *conn, FRT_RPCRequest *req)
{
if (conn != nullptr) {
FNET_Channel *ch = conn->OpenChannel();
@@ -183,11 +143,7 @@ FRT_Supervisor::InvokeVoid(FNET_Connection *conn,
void
-FRT_Supervisor::InvokeAsync(SchedulerPtr scheduler,
- FNET_Connection *conn,
- FRT_RPCRequest *req,
- double timeout,
- FRT_IRequestWait *waiter)
+FRT_Supervisor::InvokeAsync(SchedulerPtr scheduler, FNET_Connection *conn, FRT_RPCRequest *req, double timeout, FRT_IRequestWait *waiter)
{
uint32_t chid;
FNET_Packet *packet = req->CreateRequestPacket(true);
@@ -209,10 +165,7 @@ FRT_Supervisor::InvokeAsync(SchedulerPtr scheduler,
void
-FRT_Supervisor::InvokeSync(SchedulerPtr scheduler,
- FNET_Connection *conn,
- FRT_RPCRequest *req,
- double timeout)
+FRT_Supervisor::InvokeSync(SchedulerPtr scheduler, FNET_Connection *conn, FRT_RPCRequest *req, double timeout)
{
FRT_SingleReqWait waiter;
InvokeAsync(scheduler, conn, req, timeout, &waiter);
@@ -250,7 +203,7 @@ FNET_IPacketHandler::HP_RetCode
FRT_Supervisor::HandlePacket(FNET_Packet *packet, FNET_Context context)
{
uint32_t pcode = packet->GetPCODE() & 0xffff; // remove flags
- FRT_RPCRequest *req = (FRT_RPCRequest *) context._value.VOIDP;
+ auto *req = (FRT_RPCRequest *) context._value.VOIDP;
FRT_RPCInvoker *invoker = nullptr;
bool noReply = false;
@@ -265,9 +218,9 @@ FRT_Supervisor::HandlePacket(FNET_Packet *packet, FNET_Context context)
if (req->IsError()) {
if (req->GetErrorCode() != FRTE_RPC_BAD_REQUEST
- && _methodMismatchHook != nullptr)
+ && _methodMismatchHook)
{
- invoker->ForceMethod(_methodMismatchHook);
+ invoker->ForceMethod(_methodMismatchHook.get());
return (invoker->Invoke()) ?
FNET_FREE_CHANNEL : FNET_CLOSE_CHANNEL;
}
@@ -282,40 +235,6 @@ FRT_Supervisor::HandlePacket(FNET_Packet *packet, FNET_Context context)
}
}
-
-bool
-FRT_Supervisor::Start()
-{
- assert(_standAlone);
- if (_threadPool == nullptr)
- return false;
- return _transport->Start(_threadPool);
-}
-
-
-void
-FRT_Supervisor::Main()
-{
- assert(_standAlone);
- _transport->Main();
-}
-
-
-void
-FRT_Supervisor::ShutDown(bool waitFinished)
-{
- assert(_standAlone);
- _transport->ShutDown(waitFinished);
-}
-
-
-void
-FRT_Supervisor::WaitFinished()
-{
- assert(_standAlone);
- _transport->WaitFinished();
-}
-
//----------------------------------------------------
// RPC Hooks
//----------------------------------------------------
@@ -403,57 +322,38 @@ FRT_Supervisor::RPCHooks::RPC_GetMethodInfo(FRT_RPCRequest *req)
FRT_Supervisor::ConnHooks::ConnHooks(FRT_Supervisor &parent)
: _parent(parent),
- _sessionInitHook(nullptr),
- _sessionDownHook(nullptr),
- _sessionFiniHook(nullptr)
+ _sessionInitHook(),
+ _sessionDownHook(),
+ _sessionFiniHook()
{
}
-FRT_Supervisor::ConnHooks::~ConnHooks()
-{
- delete _sessionInitHook;
- delete _sessionDownHook;
- delete _sessionFiniHook;
-}
-
+FRT_Supervisor::ConnHooks::~ConnHooks() = default;
void
-FRT_Supervisor::ConnHooks::SetSessionInitHook(FRT_METHOD_PT method,
- FRT_Invokable *handler)
+FRT_Supervisor::ConnHooks::SetSessionInitHook(FRT_METHOD_PT method, FRT_Invokable *handler)
{
- delete _sessionInitHook;
- _sessionInitHook = new FRT_Method("frt.hook.sessionInit", "", "",
- method, handler);
- assert(_sessionInitHook != nullptr);
+ _sessionInitHook = std::make_unique<FRT_Method>("frt.hook.sessionInit", "", "", method, handler);
}
void
-FRT_Supervisor::ConnHooks::SetSessionDownHook(FRT_METHOD_PT method,
- FRT_Invokable *handler)
+FRT_Supervisor::ConnHooks::SetSessionDownHook(FRT_METHOD_PT method, FRT_Invokable *handler)
{
- delete _sessionDownHook;
- _sessionDownHook = new FRT_Method("frt.hook.sessionDown", "", "",
- method, handler);
- assert(_sessionDownHook != nullptr);
+ _sessionDownHook = std::make_unique<FRT_Method>("frt.hook.sessionDown", "", "", method, handler);
}
void
-FRT_Supervisor::ConnHooks::SetSessionFiniHook(FRT_METHOD_PT method,
- FRT_Invokable *handler)
+FRT_Supervisor::ConnHooks::SetSessionFiniHook(FRT_METHOD_PT method, FRT_Invokable *handler)
{
- delete _sessionFiniHook;
- _sessionFiniHook = new FRT_Method("frt.hook.sessionFini", "", "",
- method, handler);
- assert(_sessionFiniHook != nullptr);
+ _sessionFiniHook = std::make_unique<FRT_Method>("frt.hook.sessionFini", "", "", method, handler);
}
void
-FRT_Supervisor::ConnHooks::InvokeHook(FRT_Method *hook,
- FNET_Connection *conn)
+FRT_Supervisor::ConnHooks::InvokeHook(FRT_Method *hook, FNET_Connection *conn)
{
FRT_RPCRequest *req = _parent.AllocRPCRequest();
req->SetMethodName(hook->GetName());
@@ -466,8 +366,8 @@ FRT_Supervisor::ConnHooks::InitAdminChannel(FNET_Channel *channel)
{
FNET_Connection *conn = channel->GetConnection();
conn->SetCleanupHandler(this);
- if (_sessionInitHook != nullptr) {
- InvokeHook(_sessionInitHook, conn);
+ if (_sessionInitHook) {
+ InvokeHook(_sessionInitHook.get(), conn);
}
channel->SetHandler(this);
channel->SetContext(channel);
@@ -476,16 +376,15 @@ FRT_Supervisor::ConnHooks::InitAdminChannel(FNET_Channel *channel)
FNET_IPacketHandler::HP_RetCode
-FRT_Supervisor::ConnHooks::HandlePacket(FNET_Packet *packet,
- FNET_Context context)
+FRT_Supervisor::ConnHooks::HandlePacket(FNET_Packet *packet, FNET_Context context)
{
if (!packet->IsChannelLostCMD()) {
packet->Free();
return FNET_KEEP_CHANNEL;
}
FNET_Channel *ch = context._value.CHANNEL;
- if (_sessionDownHook != nullptr) {
- InvokeHook(_sessionDownHook, ch->GetConnection());
+ if (_sessionDownHook) {
+ InvokeHook(_sessionDownHook.get(), ch->GetConnection());
}
return FNET_FREE_CHANNEL;
}
@@ -494,8 +393,8 @@ FRT_Supervisor::ConnHooks::HandlePacket(FNET_Packet *packet,
void
FRT_Supervisor::ConnHooks::Cleanup(FNET_Connection *conn)
{
- if (_sessionFiniHook != nullptr) {
- InvokeHook(_sessionFiniHook, conn);
+ if (_sessionFiniHook) {
+ InvokeHook(_sessionFiniHook.get(), conn);
}
}
@@ -506,3 +405,33 @@ FRT_Supervisor::SchedulerPtr::SchedulerPtr(FNET_Transport *transport)
FRT_Supervisor::SchedulerPtr::SchedulerPtr(FNET_TransportThread *transport_thread)
: ptr(transport_thread->GetScheduler())
{ }
+
+namespace fnet::frt {
+
+StandaloneFRT::StandaloneFRT()
+ : _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)),
+ _transport(std::make_unique<FNET_Transport>()),
+ _supervisor(std::make_unique<FRT_Supervisor>(_transport.get()))
+{
+ _transport->Start(_threadPool.get());
+}
+
+StandaloneFRT::StandaloneFRT(vespalib::CryptoEngine::SP crypto)
+ : _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)),
+ _transport(std::make_unique<FNET_Transport>(std::move(crypto), 1)),
+ _supervisor(std::make_unique<FRT_Supervisor>(_transport.get()))
+{
+ _transport->Start(_threadPool.get());
+}
+
+StandaloneFRT::~StandaloneFRT()
+{
+ _transport->ShutDown(true);
+}
+
+void
+StandaloneFRT::wait_finished() const {
+ _transport->WaitFinished();
+}
+
+}
diff --git a/fnet/src/vespa/fnet/frt/supervisor.h b/fnet/src/vespa/fnet/frt/supervisor.h
index dc7fb496239..1247cb08402 100644
--- a/fnet/src/vespa/fnet/frt/supervisor.h
+++ b/fnet/src/vespa/fnet/frt/supervisor.h
@@ -9,7 +9,6 @@
#include <vespa/fnet/ipackethandler.h>
#include <vespa/fnet/connection.h>
#include <vespa/fnet/simplepacketstreamer.h>
-#include <vespa/vespalib/net/crypto_engine.h>
class FNET_Transport;
class FRT_Target;
@@ -18,6 +17,9 @@ class FNET_Scheduler;
class FRT_RPCInvoker;
class FRT_IRequestWait;
+namespace vespalib { class CryptoEngine; }
+
+
class FRT_Supervisor : public FNET_IServerAdapter,
public FNET_IPacketHandler
{
@@ -26,12 +28,10 @@ public:
{
private:
FRT_ReflectionManager *_reflectionManager;
-
- RPCHooks(const RPCHooks &);
- RPCHooks &operator=(const RPCHooks &);
-
public:
- RPCHooks(FRT_ReflectionManager *reflect)
+ RPCHooks(const RPCHooks &) = delete;
+ RPCHooks &operator=(const RPCHooks &) = delete;
+ explicit RPCHooks(FRT_ReflectionManager *reflect)
: _reflectionManager(reflect) {}
void InitRPC(FRT_Supervisor *supervisor);
@@ -45,17 +45,15 @@ public:
public FNET_IPacketHandler
{
private:
- FRT_Supervisor &_parent;
- FRT_Method *_sessionInitHook;
- FRT_Method *_sessionDownHook;
- FRT_Method *_sessionFiniHook;
-
- ConnHooks(const ConnHooks &);
- ConnHooks &operator=(const ConnHooks &);
-
+ FRT_Supervisor &_parent;
+ std::unique_ptr<FRT_Method> _sessionInitHook;
+ std::unique_ptr<FRT_Method> _sessionDownHook;
+ std::unique_ptr<FRT_Method> _sessionFiniHook;
public:
- ConnHooks(FRT_Supervisor &parent);
- ~ConnHooks();
+ ConnHooks(const ConnHooks &) = delete;
+ ConnHooks &operator=(const ConnHooks &) = delete;
+ explicit ConnHooks(FRT_Supervisor &parent);
+ ~ConnHooks() override;
void SetSessionInitHook(FRT_METHOD_PT method, FRT_Invokable *handler);
void SetSessionDownHook(FRT_METHOD_PT method, FRT_Invokable *handler);
@@ -67,32 +65,23 @@ public:
};
private:
- FNET_Transport *_transport;
- FastOS_ThreadPool *_threadPool;
- bool _standAlone;
-
- FRT_PacketFactory _packetFactory;
- FNET_SimplePacketStreamer _packetStreamer;
- FNET_Connector *_connector;
- FRT_ReflectionManager _reflectionManager;
- RPCHooks _rpcHooks;
- ConnHooks _connHooks;
- FRT_Method *_methodMismatchHook;
-
- FRT_Supervisor(const FRT_Supervisor &);
- FRT_Supervisor &operator=(const FRT_Supervisor &);
+ FNET_Transport *_transport;
+ FRT_PacketFactory _packetFactory;
+ FNET_SimplePacketStreamer _packetStreamer;
+ FNET_Connector *_connector;
+ FRT_ReflectionManager _reflectionManager;
+ RPCHooks _rpcHooks;
+ ConnHooks _connHooks;
+ std::unique_ptr<FRT_Method> _methodMismatchHook;
public:
- FRT_Supervisor(FNET_Transport *transport, FastOS_ThreadPool *threadPool);
- FRT_Supervisor(vespalib::CryptoEngine::SP crypto, uint32_t threadStackSize = 65000, uint32_t maxThreads = 0);
- FRT_Supervisor(uint32_t threadStackSize = 65000, uint32_t maxThreads = 0)
- : FRT_Supervisor(vespalib::CryptoEngine::get_default(), threadStackSize, maxThreads) {}
- virtual ~FRT_Supervisor();
+ explicit FRT_Supervisor(FNET_Transport *transport);
+ FRT_Supervisor(const FRT_Supervisor &) = delete;
+ FRT_Supervisor &operator=(const FRT_Supervisor &) = delete;
+ ~FRT_Supervisor() override;
- bool StandAlone() { return _standAlone; }
FNET_Transport *GetTransport() { return _transport; }
FNET_Scheduler *GetScheduler();
- FastOS_ThreadPool *GetThreadPool() { return _threadPool; }
FRT_ReflectionManager *GetReflectionManager() { return &_reflectionManager; }
bool Listen(const char *spec);
@@ -100,8 +89,7 @@ public:
uint32_t GetListenPort() const;
FRT_Target *GetTarget(const char *spec);
- FRT_Target *Get2WayTarget(const char *spec,
- FNET_Context connContext = FNET_Context());
+ FRT_Target *Get2WayTarget(const char *spec, FNET_Context connContext = FNET_Context());
FRT_Target *GetTarget(int port);
FRT_RPCRequest *AllocRPCRequest(FRT_RPCRequest *tradein = nullptr);
@@ -120,17 +108,10 @@ public:
};
// methods for performing rpc invocations
- static void InvokeVoid(FNET_Connection *conn,
- FRT_RPCRequest *req);
- static void InvokeAsync(SchedulerPtr scheduler,
- FNET_Connection *conn,
- FRT_RPCRequest *req,
- double timeout,
- FRT_IRequestWait *waiter);
- static void InvokeSync(SchedulerPtr scheduler,
- FNET_Connection *conn,
- FRT_RPCRequest *req,
- double timeout);
+ static void InvokeVoid(FNET_Connection *conn, FRT_RPCRequest *req);
+ static void InvokeSync(SchedulerPtr scheduler, FNET_Connection *conn, FRT_RPCRequest *req, double timeout);
+ static void InvokeAsync(SchedulerPtr scheduler, FNET_Connection *conn, FRT_RPCRequest *req, double timeout, FRT_IRequestWait *waiter);
+
// FNET ServerAdapter Interface
bool InitAdminChannel(FNET_Channel *channel) override;
@@ -138,11 +119,26 @@ public:
// Packet Handling
HP_RetCode HandlePacket(FNET_Packet *packet, FNET_Context context) override;
+};
- // Methods for controlling transport object in standalone mode
- bool Start();
- void Main();
- void ShutDown(bool waitFinished);
- void WaitFinished();
+namespace fnet::frt {
+
+/**
+ * This is a simple class that makes it easy to test RPC.
+ * Normally you do not want use it in production code as it hides your possibilites and responsibilities.
+ */
+class StandaloneFRT {
+public:
+ StandaloneFRT();
+ explicit StandaloneFRT(std::shared_ptr<vespalib::CryptoEngine> crypto);
+ ~StandaloneFRT();
+ FRT_Supervisor & supervisor() { return *_supervisor; }
+ // TODO Remove this method as it is a relic from the ancient non-threaded world.
+ void wait_finished() const;
+private:
+ std::unique_ptr<FastOS_ThreadPool> _threadPool;
+ std::unique_ptr<FNET_Transport> _transport;
+ std::unique_ptr<FRT_Supervisor> _supervisor;
};
+}