aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-05-10 11:25:48 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2019-05-10 12:25:38 +0000
commit4412aace869986be3a1060f78f367841353d3384 (patch)
treef4b5e1f6da5eaf1563f3b2fd64779800acfd5796
parent840d4e0578dc627b75bcd0050f1b253e84cc30ed (diff)
Simplify the supervisor responsibility
-rw-r--r--config/src/apps/vespa-configproxy-cmd/proxycmd.cpp30
-rw-r--r--config/src/apps/vespa-configproxy-cmd/proxycmd.h3
-rw-r--r--config/src/apps/vespa-get-config/getconfig.cpp36
-rw-r--r--config/src/apps/vespa-ping-configproxy/pingproxy.cpp24
-rw-r--r--config/src/tests/failover/failover.cpp19
-rw-r--r--config/src/tests/file_acquirer/file_acquirer_test.cpp16
-rw-r--r--config/src/tests/frt/frt.cpp14
-rw-r--r--config/src/vespa/config/file_acquirer/file_acquirer.cpp10
-rw-r--r--config/src/vespa/config/file_acquirer/file_acquirer.h6
-rw-r--r--config/src/vespa/config/frt/frtconnectionpool.cpp9
-rw-r--r--config/src/vespa/config/frt/frtconnectionpool.h5
-rw-r--r--configd/src/apps/cmd/main.cpp13
-rw-r--r--configd/src/apps/sentinel/rpcserver.cpp12
-rw-r--r--configd/src/apps/sentinel/rpcserver.h2
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp13
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.h10
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp15
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.h4
-rw-r--r--fnet/src/examples/frt/rpc/echo_client.cpp5
-rw-r--r--fnet/src/examples/frt/rpc/rpc_callback_client.cpp5
-rw-r--r--fnet/src/examples/frt/rpc/rpc_callback_server.cpp11
-rw-r--r--fnet/src/examples/frt/rpc/rpc_client.cpp5
-rw-r--r--fnet/src/examples/frt/rpc/rpc_info.cpp7
-rw-r--r--fnet/src/examples/frt/rpc/rpc_invoke.cpp5
-rw-r--r--fnet/src/examples/frt/rpc/rpc_proxy.cpp8
-rw-r--r--fnet/src/examples/frt/rpc/rpc_server.cpp6
-rw-r--r--fnet/src/tests/frt/method_pt/method_pt.cpp10
-rw-r--r--fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp2
-rw-r--r--fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp15
-rw-r--r--fnet/src/tests/frt/rpc/detach_return_invoke.cpp15
-rw-r--r--fnet/src/tests/frt/rpc/invoke.cpp24
-rw-r--r--fnet/src/tests/frt/rpc/session.cpp12
-rw-r--r--fnet/src/tests/frt/rpc/sharedblob.cpp8
-rw-r--r--fnet/src/tests/info/info.cpp5
-rw-r--r--fnet/src/vespa/fnet/frt/supervisor.cpp193
-rw-r--r--fnet/src/vespa/fnet/frt/supervisor.h106
-rw-r--r--jrt_test/src/jrt-test/simpleserver/simpleserver.cpp11
-rw-r--r--jrt_test/src/tests/echo/echo-client.cpp5
-rw-r--r--jrt_test/src/tests/mandatory-methods/extract-reflection.cpp16
-rw-r--r--jrt_test/src/tests/mockup-invoke/mockup-server.cpp8
-rw-r--r--jrt_test/src/tests/rpc-error/test-errors.cpp12
-rw-r--r--logd/src/logd/config_subscriber.cpp6
-rw-r--r--logd/src/logd/config_subscriber.h3
-rw-r--r--logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp26
-rw-r--r--messagebus/src/tests/shutdown/shutdown.cpp11
-rw-r--r--messagebus/src/tests/targetpool/targetpool.cpp6
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp2
-rw-r--r--searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp47
-rw-r--r--searchcore/src/vespa/searchcore/fdispatch/common/rpc.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h3
-rw-r--r--searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp13
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp10
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogclient.h5
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp12
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.h4
-rw-r--r--slobrok/src/apps/check_slobrok/check_slobrok.cpp18
-rw-r--r--slobrok/src/apps/sbcmd/sbcmd.cpp18
-rw-r--r--slobrok/src/tests/configure/configure.cpp44
-rw-r--r--slobrok/src/tests/mirrorapi/mirrorapi.cpp26
-rw-r--r--slobrok/src/tests/oldapi/old.cpp30
-rw-r--r--slobrok/src/tests/registerapi/registerapi.cpp5
-rw-r--r--slobrok/src/tests/standalone/standalone.cpp25
-rw-r--r--slobrok/src/tests/startsome/rpc_info.cpp16
-rw-r--r--slobrok/src/tests/startsome/tstdst.cpp2
-rw-r--r--slobrok/src/vespa/slobrok/server/sbenv.cpp2
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.cpp9
-rw-r--r--storage/src/vespa/storage/storageserver/fnetlistener.h14
-rw-r--r--storage/src/vespa/storage/tools/storage-cmd.cpp26
-rw-r--r--storageserver/src/tests/storageservertest.cpp75
-rw-r--r--vespaclient/src/vespa/vespaclient/vdsstates/statesapp.cpp16
-rw-r--r--vespaclient/src/vespa/vespaclient/vesparoute/application.cpp6
72 files changed, 565 insertions, 660 deletions
diff --git a/config/src/apps/vespa-configproxy-cmd/proxycmd.cpp b/config/src/apps/vespa-configproxy-cmd/proxycmd.cpp
index 3407a880ec7..ce0665eae4a 100644
--- a/config/src/apps/vespa-configproxy-cmd/proxycmd.cpp
+++ b/config/src/apps/vespa-configproxy-cmd/proxycmd.cpp
@@ -16,41 +16,41 @@ Flags::Flags()
targethost("localhost"),
portnumber(19090)
{ }
-Flags::~Flags() { }
+Flags::~Flags() = default;
ProxyCmd::ProxyCmd(const Flags& flags)
- : _supervisor(NULL),
- _target(NULL),
- _req(NULL),
+ : _server(),
+ _supervisor(nullptr),
+ _target(nullptr),
+ _req(nullptr),
_flags(flags)
{ }
-ProxyCmd::~ProxyCmd() { }
+ProxyCmd::~ProxyCmd() = default;
void ProxyCmd::initRPC() {
- _supervisor = new FRT_Supervisor();
+ _server = std::make_unique<fnet::frt::StandaloneFRT>();
+ _supervisor = &_server->supervisor();
_req = _supervisor->AllocRPCRequest();
- _supervisor->Start();
}
void ProxyCmd::invokeRPC() {
- if (_req == NULL) return;
+ if (_req == nullptr) return;
_target->InvokeSync(_req, 65.0);
}
void ProxyCmd::finiRPC() {
- if (_req != NULL) {
+ if (_req != nullptr) {
_req->SubRef();
- _req = NULL;
+ _req = nullptr;
}
- if (_target != NULL) {
+ if (_target != nullptr) {
_target->SubRef();
_target = NULL;
}
- if (_supervisor != NULL) {
- _supervisor->ShutDown(true);
- delete _supervisor;
- _supervisor = NULL;
+ if (_server) {
+ _server.reset();
+ _supervisor = nullptr;
}
}
diff --git a/config/src/apps/vespa-configproxy-cmd/proxycmd.h b/config/src/apps/vespa-configproxy-cmd/proxycmd.h
index df8f429a3c9..49be0fff885 100644
--- a/config/src/apps/vespa-configproxy-cmd/proxycmd.h
+++ b/config/src/apps/vespa-configproxy-cmd/proxycmd.h
@@ -9,6 +9,8 @@ class FRT_Target;
class FRT_RPCRequest;
class FRT_Values;
+namespace fnet::frt { class StandaloneFRT; }
+
struct Flags {
vespalib::string method;
std::vector<vespalib::string> args;
@@ -23,6 +25,7 @@ struct Flags {
class ProxyCmd
{
private:
+ std::unique_ptr<fnet::frt::StandaloneFRT> _server;
FRT_Supervisor *_supervisor;
FRT_Target *_target;
FRT_RPCRequest *_req;
diff --git a/config/src/apps/vespa-get-config/getconfig.cpp b/config/src/apps/vespa-get-config/getconfig.cpp
index 290beacf5a4..f23f28b2734 100644
--- a/config/src/apps/vespa-get-config/getconfig.cpp
+++ b/config/src/apps/vespa-get-config/getconfig.cpp
@@ -20,6 +20,7 @@ using namespace config;
class GetConfig : public FastOS_Application
{
private:
+ std::unique_ptr<fnet::frt::StandaloneFRT> _server;
FRT_Supervisor *_supervisor;
FRT_Target *_target;
@@ -27,7 +28,7 @@ private:
GetConfig &operator=(const GetConfig &);
public:
- GetConfig() : _supervisor(NULL), _target(NULL) {}
+ GetConfig() : _server(), _supervisor(nullptr), _target(nullptr) {}
virtual ~GetConfig();
int usage();
void initRPC(const char *spec);
@@ -38,8 +39,8 @@ public:
GetConfig::~GetConfig()
{
- LOG_ASSERT(_supervisor == NULL);
- LOG_ASSERT(_target == NULL);
+ LOG_ASSERT(_supervisor == nullptr);
+ LOG_ASSERT(_target == nullptr);
}
@@ -71,23 +72,22 @@ GetConfig::usage()
void
GetConfig::initRPC(const char *spec)
{
- _supervisor = new FRT_Supervisor();
+ _server = std::make_unique<fnet::frt::StandaloneFRT>();
+ _supervisor = &_server->supervisor();
_target = _supervisor->GetTarget(spec);
- _supervisor->Start();
}
void
GetConfig::finiRPC()
{
- if (_target != NULL) {
+ if (_target != nullptr) {
_target->SubRef();
- _target = NULL;
+ _target = nullptr;
}
- if (_supervisor != NULL) {
- _supervisor->ShutDown(true);
- delete _supervisor;
- _supervisor = NULL;
+ if (_server) {
+ _server.reset();
+ _supervisor = nullptr;
}
}
@@ -99,8 +99,8 @@ GetConfig::Main()
char c = -1;
std::vector<vespalib::string> defSchema;
- const char *schema = NULL;
- const char *defName = NULL;
+ const char *schema = nullptr;
+ const char *defName = nullptr;
const char *defMD5 = "";
std::string defNamespace("config");
const char *serverHost = "localhost";
@@ -110,7 +110,7 @@ GetConfig::Main()
const char *vespaVersionString = nullptr;
int64_t generation = 0;
- if (configId == NULL) {
+ if (configId == nullptr) {
configId = "";
}
const char *configMD5 = "";
@@ -119,7 +119,7 @@ GetConfig::Main()
int serverPort = 19090;
- const char *optArg = NULL;
+ const char *optArg = nullptr;
int optInd = 0;
while ((c = GetOpt("a:n:v:g:i:jlm:c:t:V:w:r:s:p:dh", optArg, optInd)) != -1) {
int retval = 1;
@@ -181,19 +181,19 @@ GetConfig::Main()
}
}
- if (defName == NULL || serverPort == 0) {
+ if (defName == nullptr || serverPort == 0) {
usage();
return 1;
}
- if (strchr(defName, '.') != NULL) {
+ if (strchr(defName, '.') != nullptr) {
const char *tmp = defName;
defName = strrchr(defName, '.');
defName++;
defNamespace = std::string(tmp, defName - tmp - 1);
}
- if (schema != NULL) {
+ if (schema != nullptr) {
std::ifstream is;
is.open(schema);
std::string item;
diff --git a/config/src/apps/vespa-ping-configproxy/pingproxy.cpp b/config/src/apps/vespa-ping-configproxy/pingproxy.cpp
index a88d7deb79a..d681473ce34 100644
--- a/config/src/apps/vespa-ping-configproxy/pingproxy.cpp
+++ b/config/src/apps/vespa-ping-configproxy/pingproxy.cpp
@@ -12,6 +12,7 @@ LOG_SETUP("vespa-ping-configproxy");
class PingProxy : public FastOS_Application
{
private:
+ std::unique_ptr<fnet::frt::StandaloneFRT> _server;
FRT_Supervisor *_supervisor;
FRT_Target *_target;
@@ -19,7 +20,7 @@ private:
PingProxy &operator=(const PingProxy &);
public:
- PingProxy() : _supervisor(NULL), _target(NULL) {}
+ PingProxy() : _server(), _supervisor(nullptr), _target(nullptr) {}
virtual ~PingProxy();
int usage();
void initRPC(const char *spec);
@@ -30,8 +31,8 @@ public:
PingProxy::~PingProxy()
{
- LOG_ASSERT(_supervisor == NULL);
- LOG_ASSERT(_target == NULL);
+ LOG_ASSERT(_supervisor == nullptr);
+ LOG_ASSERT(_target == nullptr);
}
@@ -48,23 +49,22 @@ PingProxy::usage()
void
PingProxy::initRPC(const char *spec)
{
- _supervisor = new FRT_Supervisor();
+ _server = std::make_unique<fnet::frt::StandaloneFRT>();
+ _supervisor = &_server->supervisor();
_target = _supervisor->GetTarget(spec);
- _supervisor->Start();
}
void
PingProxy::finiRPC()
{
- if (_target != NULL) {
+ if (_target != nullptr) {
_target->SubRef();
- _target = NULL;
+ _target = nullptr;
}
- if (_supervisor != NULL) {
- _supervisor->ShutDown(true);
- delete _supervisor;
- _supervisor = NULL;
+ if (_server) {
+ _server.reset();
+ _supervisor = nullptr;
}
}
@@ -80,7 +80,7 @@ PingProxy::Main()
int clientTimeout = 5;
int serverPort = 19090;
- const char *optArg = NULL;
+ const char *optArg = nullptr;
int optInd = 0;
while ((c = GetOpt("w:s:p:dh", optArg, optInd)) != -1) {
switch (c) {
diff --git a/config/src/tests/failover/failover.cpp b/config/src/tests/failover/failover.cpp
index 990ca761e7e..7e7017ff057 100644
--- a/config/src/tests/failover/failover.cpp
+++ b/config/src/tests/failover/failover.cpp
@@ -34,7 +34,7 @@ struct RPCServer : public FRT_Invokable {
vespalib::Barrier barrier;
int64_t gen;
- RPCServer() : supervisor(NULL), barrier(2), gen(1) { }
+ RPCServer() : supervisor(nullptr), barrier(2), gen(1) { }
void init(FRT_Supervisor * s) {
FRT_ReflectionBuilder rb(s);
@@ -81,18 +81,20 @@ struct RPCServer : public FRT_Invokable {
void verifyConfig(std::unique_ptr<MyConfig> config)
{
- ASSERT_TRUE(config.get() != NULL);
+ ASSERT_TRUE(config);
ASSERT_EQUAL("myval", config->myField);
}
struct ServerFixture {
using UP = std::unique_ptr<ServerFixture>;
+ std::unique_ptr<fnet::frt::StandaloneFRT> frtServer;
FRT_Supervisor * supervisor;
RPCServer server;
Barrier b;
const vespalib::string listenSpec;
ServerFixture(const vespalib::string & ls)
- : supervisor(NULL),
+ : frtServer(),
+ supervisor(nullptr),
server(),
b(2),
listenSpec(ls)
@@ -106,22 +108,21 @@ struct ServerFixture {
void start()
{
- supervisor = new FRT_Supervisor();
+ frtServer = std::make_unique<fnet::frt::StandaloneFRT>();
+ supervisor = & frtServer->supervisor();
server.init(supervisor);
supervisor->Listen(get_port(listenSpec));
wait(); // Wait until test runner signals we can start
- supervisor->Main();
wait(); // Signalling that we have shut down
wait(); // Wait for signal saying that supervisor is deleted
}
void stop()
{
- if (supervisor != NULL) {
- supervisor->ShutDown(true);
+ if (frtServer) {
wait(); // Wait for supervisor to shut down
- delete supervisor;
- supervisor = NULL;
+ frtServer.reset();
+ supervisor = nullptr;
wait(); // Signal that we are done and start can return.
}
}
diff --git a/config/src/tests/file_acquirer/file_acquirer_test.cpp b/config/src/tests/file_acquirer/file_acquirer_test.cpp
index 0453c6ddbd0..33bb8f47e09 100644
--- a/config/src/tests/file_acquirer/file_acquirer_test.cpp
+++ b/config/src/tests/file_acquirer/file_acquirer_test.cpp
@@ -7,8 +7,10 @@
using namespace config;
struct ServerFixture : FRT_Invokable {
- FRT_Supervisor orb;
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor &orb;
vespalib::string spec;
+
void init_rpc() {
FRT_ReflectionBuilder rb(&orb);
rb.DefineMethod("waitFor", "s", "s", FRT_METHOD(ServerFixture::RPC_waitFor), this);
@@ -16,24 +18,24 @@ struct ServerFixture : FRT_Invokable {
rb.ParamDesc("file_ref", "file reference to wait for and resolve");
rb.ReturnDesc("file_path", "actual path to the requested file");
}
- ServerFixture() : orb() {
+
+ ServerFixture() : server(), orb(server.supervisor()) {
init_rpc();
orb.Listen(0);
spec = vespalib::make_string("tcp/localhost:%d", orb.GetListenPort());
- orb.Start();
}
+
void RPC_waitFor(FRT_RPCRequest *req) {
FRT_Values &params = *req->GetParams();
- FRT_Values &ret = *req->GetReturn();
+ FRT_Values &ret = *req->GetReturn();
if (strcmp(params[0]._string._str, "my_ref") == 0) {
ret.AddString("my_path");
} else {
req->SetError(FRTE_RPC_METHOD_FAILED, "invalid file reference");
}
}
- ~ServerFixture() {
- orb.ShutDown(true);
- }
+
+ ~ServerFixture() = default;
};
TEST_FF("require that files can be acquired over rpc", ServerFixture(), RpcFileAcquirer(f1.spec)) {
diff --git a/config/src/tests/frt/frt.cpp b/config/src/tests/frt/frt.cpp
index c225dd9dcf3..f489ca4c7d9 100644
--- a/config/src/tests/frt/frt.cpp
+++ b/config/src/tests/frt/frt.cpp
@@ -119,17 +119,18 @@ namespace {
int errorCode;
int timeout;
FRT_RPCRequest * ans;
- FRT_Supervisor supervisor;
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & supervisor;
FNET_Scheduler scheduler;
vespalib::string address;
- ConnectionMock(FRT_RPCRequest * answer = NULL);
+ ConnectionMock(FRT_RPCRequest * answer = nullptr);
~ConnectionMock();
FRT_RPCRequest * allocRPCRequest() override { return supervisor.AllocRPCRequest(); }
void setError(int ec) override { errorCode = ec; }
void invoke(FRT_RPCRequest * req, double t, FRT_IRequestWait * waiter) override
{
timeout = static_cast<int>(t);
- if (ans != NULL)
+ if (ans != nullptr)
waiter->RequestDone(ans);
else
waiter->RequestDone(req);
@@ -142,10 +143,11 @@ namespace {
: errorCode(0),
timeout(0),
ans(answer),
- supervisor(),
+ server(),
+ supervisor(server.supervisor()),
address()
{ }
- ConnectionMock::~ConnectionMock() { }
+ ConnectionMock::~ConnectionMock() = default;
struct FactoryMock : public ConnectionFactory {
ConnectionMock * current;
@@ -284,7 +286,7 @@ TEST("require that v3 request is correctly initialized") {
ConfigDefinition origDef(MyConfig::CONFIG_DEF_SCHEMA);
FRT_RPCRequest * req = v3req.getRequest();
- ASSERT_TRUE(req != NULL);
+ ASSERT_TRUE(req != nullptr);
FRT_Values & params(*req->GetParams());
std::string json(params[0]._string._str);
Slime slime;
diff --git a/config/src/vespa/config/file_acquirer/file_acquirer.cpp b/config/src/vespa/config/file_acquirer/file_acquirer.cpp
index 140d63aa7e6..a61f480b33f 100644
--- a/config/src/vespa/config/file_acquirer/file_acquirer.cpp
+++ b/config/src/vespa/config/file_acquirer/file_acquirer.cpp
@@ -4,6 +4,8 @@
#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fnet/frt/target.h>
#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/fastos/thread.h>
#include <vespa/log/log.h>
LOG_SETUP(".config.file_acquirer");
@@ -11,10 +13,12 @@ LOG_SETUP(".config.file_acquirer");
namespace config {
RpcFileAcquirer::RpcFileAcquirer(const vespalib::string &spec)
- : _orb(std::make_unique<FRT_Supervisor>()),
+ : _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)),
+ _transport(std::make_unique<FNET_Transport>()),
+ _orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_spec(spec)
{
- _orb->Start();
+ _transport->Start(_threadPool.get());
}
vespalib::string
@@ -39,7 +43,7 @@ RpcFileAcquirer::wait_for(const vespalib::string &file_ref, double timeout_s)
RpcFileAcquirer::~RpcFileAcquirer()
{
- _orb->ShutDown(true);
+ _transport->ShutDown(true);
}
} // namespace config
diff --git a/config/src/vespa/config/file_acquirer/file_acquirer.h b/config/src/vespa/config/file_acquirer/file_acquirer.h
index 77b0cf9a821..844e277cbf9 100644
--- a/config/src/vespa/config/file_acquirer/file_acquirer.h
+++ b/config/src/vespa/config/file_acquirer/file_acquirer.h
@@ -4,6 +4,8 @@
#include <vespa/vespalib/stllike/string.h>
class FRT_Supervisor;
+class FNET_Transport;
+class FastOS_ThreadPool;
namespace config {
@@ -23,12 +25,14 @@ struct FileAcquirer {
class RpcFileAcquirer : public FileAcquirer
{
private:
+ std::unique_ptr<FastOS_ThreadPool> _threadPool;
+ std::unique_ptr<FNET_Transport> _transport;
std::unique_ptr<FRT_Supervisor> _orb;
vespalib::string _spec;
public:
RpcFileAcquirer(const vespalib::string &spec);
vespalib::string wait_for(const vespalib::string &file_ref, double timeout_s) override;
- ~RpcFileAcquirer();
+ ~RpcFileAcquirer() override;
};
} // namespace config
diff --git a/config/src/vespa/config/frt/frtconnectionpool.cpp b/config/src/vespa/config/frt/frtconnectionpool.cpp
index 4bea8062a03..b7440ceb7f0 100644
--- a/config/src/vespa/config/frt/frtconnectionpool.cpp
+++ b/config/src/vespa/config/frt/frtconnectionpool.cpp
@@ -4,6 +4,7 @@
#include <vespa/vespalib/util/host_name.h>
#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fnet/transport.h>
+#include <vespa/fastos/thread.h>
namespace config {
@@ -26,7 +27,9 @@ FRTConnectionPool::FRTConnectionKey::operator==(const FRTConnectionKey& right) c
}
FRTConnectionPool::FRTConnectionPool(const ServerSpec & spec, const TimingValues & timingValues)
- : _supervisor(std::make_unique<FRT_Supervisor>()),
+ : _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)),
+ _transport(std::make_unique<FNET_Transport>()),
+ _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())),
_selectIdx(0),
_hostname("")
{
@@ -35,12 +38,12 @@ FRTConnectionPool::FRTConnectionPool(const ServerSpec & spec, const TimingValues
_connections[key] = std::make_shared<FRTConnection>(spec.getHost(i), *_supervisor, timingValues);
}
setHostname();
- _supervisor->Start();
+ _transport->Start(_threadPool.get());
}
FRTConnectionPool::~FRTConnectionPool()
{
- _supervisor->ShutDown(true);
+ _transport->ShutDown(true);
}
void
diff --git a/config/src/vespa/config/frt/frtconnectionpool.h b/config/src/vespa/config/frt/frtconnectionpool.h
index e671c756db5..0b30e723272 100644
--- a/config/src/vespa/config/frt/frtconnectionpool.h
+++ b/config/src/vespa/config/frt/frtconnectionpool.h
@@ -7,6 +7,9 @@
#include <vector>
#include <map>
+class FNET_Transport;
+class FastOS_ThreadPool;
+
namespace config {
class FRTConnectionPool : public ConnectionFactory {
@@ -29,6 +32,8 @@ private:
int operator==(const FRTConnectionKey& right) const;
};
+ std::unique_ptr<FastOS_ThreadPool> _threadPool;
+ std::unique_ptr<FNET_Transport> _transport;
std::unique_ptr<FRT_Supervisor> _supervisor;
int _selectIdx;
vespalib::string _hostname;
diff --git a/configd/src/apps/cmd/main.cpp b/configd/src/apps/cmd/main.cpp
index 6228de86a80..e448fff1c6d 100644
--- a/configd/src/apps/cmd/main.cpp
+++ b/configd/src/apps/cmd/main.cpp
@@ -14,7 +14,8 @@ LOG_SETUP("vespa-sentinel-cmd");
class Cmd
{
private:
- std::unique_ptr<FRT_Supervisor> _supervisor;
+ std::unique_ptr<fnet::frt::StandaloneFRT> _server;
+ FRT_Supervisor *_supervisor;
FRT_Target *_target;
public:
@@ -44,9 +45,9 @@ void usage()
void
Cmd::initRPC(const char *spec)
{
- _supervisor = std::make_unique<FRT_Supervisor>();
+ _server = std::make_unique<fnet::frt::StandaloneFRT>();
+ _supervisor = & _server->supervisor();
_target = _supervisor->GetTarget(spec);
- _supervisor->Start();
}
@@ -57,9 +58,9 @@ Cmd::finiRPC()
_target->SubRef();
_target = nullptr;
}
- if (_supervisor) {
- _supervisor->ShutDown(true);
- _supervisor.reset();
+ if (_server) {
+ _server.reset();
+ _supervisor = nullptr;
}
}
diff --git a/configd/src/apps/sentinel/rpcserver.cpp b/configd/src/apps/sentinel/rpcserver.cpp
index b719d012304..a49cba50e4d 100644
--- a/configd/src/apps/sentinel/rpcserver.cpp
+++ b/configd/src/apps/sentinel/rpcserver.cpp
@@ -8,22 +8,18 @@ LOG_SETUP(".rpcserver");
namespace config::sentinel {
RpcServer::RpcServer(int portNumber, CommandQueue &cmdQ)
- : _supervisor(),
+ : _server(),
_rpcHooks(cmdQ),
_port(portNumber)
{
- _rpcHooks.initRPC(&_supervisor);
- if (_supervisor.Listen(portNumber)) {
+ _rpcHooks.initRPC(&_server.supervisor());
+ if (_server.supervisor().Listen(portNumber)) {
LOG(config, "listening on port %d", portNumber);
- _supervisor.Start();
} else {
LOG(error, "unable to listen to port %d", portNumber);
}
}
-RpcServer::~RpcServer()
-{
- _supervisor.ShutDown(true);
-}
+RpcServer::~RpcServer() = default;
} // namespace config::sentinel
diff --git a/configd/src/apps/sentinel/rpcserver.h b/configd/src/apps/sentinel/rpcserver.h
index f295975f224..ef4b394fdca 100644
--- a/configd/src/apps/sentinel/rpcserver.h
+++ b/configd/src/apps/sentinel/rpcserver.h
@@ -13,7 +13,7 @@ namespace config::sentinel {
class RpcServer
{
private:
- FRT_Supervisor _supervisor;
+ fnet::frt::StandaloneFRT _server;
RPCHooks _rpcHooks;
int _port;
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp
index e00164b5a1a..757f3a976ed 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp
@@ -20,7 +20,9 @@ namespace documentapi {
ExternPolicy::ExternPolicy(const string &param) :
_lock(),
- _orb(std::make_unique<FRT_Supervisor>()),
+ _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)),
+ _transport(std::make_unique<FNET_Transport>()),
+ _orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_mirror(),
_pattern(),
_session(),
@@ -56,7 +58,7 @@ ExternPolicy::ExternPolicy(const string &param) :
slobrok::ConfiguratorFactory config(spec);
_mirror.reset(new MirrorAPI(*_orb, config));
- _started = _orb->Start();
+ _started = _transport->Start(_threadPool.get());
if (!_started) {
_error = "Failed to start FNET supervisor.";
return;
@@ -81,7 +83,7 @@ ExternPolicy::~ExternPolicy()
{
_mirror.reset();
if (_started) {
- _orb->ShutDown(true);
+ _transport->ShutDown(true);
}
}
@@ -133,10 +135,9 @@ ExternPolicy::update()
IMirrorAPI::SpecList entries = _mirror->lookup(_pattern);
if (!entries.empty()) {
- for (IMirrorAPI::SpecList::iterator it = entries.begin();
- it != entries.end(); ++it)
+ for (const auto & spec : entries)
{
- _recipients.push_back(mbus::Hop::parse(it->second + _session));
+ _recipients.push_back(mbus::Hop::parse(spec.second + _session));
}
}
}
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.h
index 61182071a54..c56f8f214ec 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.h
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.h
@@ -9,6 +9,8 @@
#include <vespa/documentapi/common.h>
class FRT_Supervisor;
+class FNET_Transport;
+class FastOS_ThreadPool;
namespace documentapi {
@@ -19,9 +21,11 @@ namespace documentapi {
class ExternPolicy : public mbus::IRoutingPolicy {
private:
using IMirrorAPI = slobrok::api::IMirrorAPI;
- vespalib::Lock _lock;
- std::unique_ptr<FRT_Supervisor> _orb;
- std::unique_ptr<IMirrorAPI> _mirror;
+ vespalib::Lock _lock;
+ std::unique_ptr<FastOS_ThreadPool> _threadPool;
+ std::unique_ptr<FNET_Transport> _transport;
+ std::unique_ptr<FRT_Supervisor> _orb;
+ std::unique_ptr<IMirrorAPI> _mirror;
string _pattern;
string _session;
string _error;
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp
index 5bbfa75012d..18dd525b066 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp
@@ -5,6 +5,8 @@
#include <vespa/messagebus/routing/routingcontext.h>
#include <vespa/fnet/frt/frt.h>
#include <vespa/slobrok/sbmirror.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/fastos/thread.h>
using slobrok::api::IMirrorAPI;
using slobrok::api::MirrorAPI;
@@ -14,7 +16,9 @@ namespace documentapi {
ExternSlobrokPolicy::ExternSlobrokPolicy(const std::map<string, string>& param)
: AsyncInitializationPolicy(param),
_firstTry(true),
- _orb(std::make_unique<FRT_Supervisor>()),
+ _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)),
+ _transport(std::make_unique<FNET_Transport>()),
+ _orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_slobrokConfigId("admin/slobrok.0")
{
if (param.find("config") != param.end()) {
@@ -42,10 +46,10 @@ ExternSlobrokPolicy::ExternSlobrokPolicy(const std::map<string, string>& param)
ExternSlobrokPolicy::~ExternSlobrokPolicy()
{
- bool started = _mirror.get() != NULL;
+ bool started = (bool)_mirror;
_mirror.reset();
if (started) {
- _orb->ShutDown(true);
+ _transport->ShutDown(true);
}
}
@@ -56,13 +60,12 @@ string ExternSlobrokPolicy::init() {
} else if (_configSources.size() != 0) {
slobrok::ConfiguratorFactory config(
config::ConfigUri(_slobrokConfigId,
- config::IConfigContext::SP(
- new config::ConfigContext(config::ServerSpec(_configSources)))));
+ std::make_unique<config::ConfigContext>(config::ServerSpec(_configSources))));
_mirror.reset(new MirrorAPI(*_orb, config));
}
if (_mirror.get()) {
- _orb->Start();
+ _transport->Start(_threadPool.get());
}
return "";
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.h
index dc0534095b6..d2966f852d5 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.h
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.h
@@ -9,6 +9,8 @@
#include <vespa/config/subscription/sourcespec.h>
class FRT_Supervisor;
+class FNET_Transport;
+class FastOS_ThreadPool;
namespace documentapi {
@@ -22,6 +24,8 @@ protected:
bool _firstTry;
config::ServerSpec::HostSpecList _configSources;
vespalib::Lock _lock;
+ std::unique_ptr<FastOS_ThreadPool> _threadPool;
+ std::unique_ptr<FNET_Transport> _transport;
std::unique_ptr<FRT_Supervisor> _orb;
std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
std::vector<std::string> _slobroks;
diff --git a/fnet/src/examples/frt/rpc/echo_client.cpp b/fnet/src/examples/frt/rpc/echo_client.cpp
index 9d73d38cd1e..cc406224135 100644
--- a/fnet/src/examples/frt/rpc/echo_client.cpp
+++ b/fnet/src/examples/frt/rpc/echo_client.cpp
@@ -19,9 +19,9 @@ EchoClient::Main()
printf("usage : echo_client <connectspec>\n");
return 1;
}
- FRT_Supervisor supervisor;
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & supervisor = server.supervisor();
- supervisor.Start();
FRT_Target *target = supervisor.GetTarget(_argv[1]);
FRT_RPCRequest *req = supervisor.AllocRPCRequest();
FRT_Values *args = req->GetParams();
@@ -84,7 +84,6 @@ EchoClient::Main()
printf("Return values != parameters.\n");
}
req->SubRef();
- supervisor.ShutDown(true);
return 0;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp
index 7c6434e870a..726a500cc55 100644
--- a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp
@@ -47,9 +47,9 @@ MyApp::Main()
}
bool ok = true;
RPC rpc;
- FRT_Supervisor orb;
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & orb = server.supervisor();
rpc.Init(&orb);
- orb.Start();
FRT_Target *target = orb.Get2WayTarget(_argv[1]);
FRT_RPCRequest *req = orb.AllocRPCRequest();
@@ -102,7 +102,6 @@ MyApp::Main()
req->SubRef();
target->SubRef();
- orb.ShutDown(true);
return ok ? 0 : 1;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
index ac7b34ebda0..7e0caaba00d 100644
--- a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
@@ -60,11 +60,12 @@ MyApp::Main()
return 1;
}
RPC rpc;
- FRT_Supervisor orb;
- rpc.Init(&orb);
- orb.Listen(_argv[1]);
- FNET_SignalShutDown ssd(*orb.GetTransport());
- orb.Main();
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & supervisor = server.supervisor();
+ rpc.Init(&supervisor);
+ supervisor.Listen(_argv[1]);
+ FNET_SignalShutDown ssd(*supervisor.GetTransport());
+ server.wait_finished();
return 0;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_client.cpp b/fnet/src/examples/frt/rpc/rpc_client.cpp
index cc230d2bc7c..fc1d54d3440 100644
--- a/fnet/src/examples/frt/rpc/rpc_client.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_client.cpp
@@ -19,9 +19,9 @@ RPCClient::Main()
printf("usage : rpc_client <connectspec>\n");
return 1;
}
- FRT_Supervisor supervisor;
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & supervisor = server.supervisor();
- supervisor.Start();
FRT_Target *target = supervisor.GetTarget(_argv[1]);
const char *str1 = "abc";
@@ -80,7 +80,6 @@ RPCClient::Main()
req->SubRef();
target->SubRef();
- supervisor.ShutDown(true);
return 0;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_info.cpp b/fnet/src/examples/frt/rpc/rpc_info.cpp
index 5f17e69a10d..d90d22d1986 100644
--- a/fnet/src/examples/frt/rpc/rpc_info.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_info.cpp
@@ -79,11 +79,12 @@ RPCInfo::Main()
}
bool verbose = (_argc > 2 && strcmp(_argv[2], "verbose") == 0);
- FRT_Supervisor supervisor;
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & supervisor = server.supervisor();
+
FRT_Target *target = supervisor.GetTarget(_argv[1]);
FRT_RPCRequest *m_list = nullptr;
FRT_RPCRequest *info = nullptr;
- supervisor.Start();
GetReq(&info, &supervisor);
info->SetMethodName("frt.rpc.ping");
@@ -91,7 +92,6 @@ RPCInfo::Main()
if (info->IsError()) {
fprintf(stderr, "Error talking to %s\n", _argv[1]);
FreeReqs(m_list, info);
- supervisor.ShutDown(true);
return 1;
}
@@ -129,7 +129,6 @@ RPCInfo::Main()
}
FreeReqs(m_list, info);
target->SubRef();
- supervisor.ShutDown(true);
return 0;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_invoke.cpp b/fnet/src/examples/frt/rpc/rpc_invoke.cpp
index ea34ffa6905..fb82622a537 100644
--- a/fnet/src/examples/frt/rpc/rpc_invoke.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_invoke.cpp
@@ -71,8 +71,8 @@ int
RPCClient::run()
{
int retCode = 0;
- FRT_Supervisor supervisor;
- supervisor.Start();
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & supervisor = server.supervisor();
int targetArg = 1;
int methNameArg = 2;
int startOfArgs = 3;
@@ -109,7 +109,6 @@ RPCClient::run()
}
req->SubRef();
target->SubRef();
- supervisor.ShutDown(true);
return retCode;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_proxy.cpp b/fnet/src/examples/frt/rpc/rpc_proxy.cpp
index 496190b2b80..fa152dcec10 100644
--- a/fnet/src/examples/frt/rpc/rpc_proxy.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_proxy.cpp
@@ -224,18 +224,18 @@ App::Main()
}
bool verbose = (_argc > 3) && (strcmp(_argv[3], "verbose") == 0);
- FRT_Supervisor supervisor;
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & supervisor = server.supervisor();
RPCProxy proxy(supervisor, _argv[2], verbose);
supervisor.GetReflectionManager()->Reset();
supervisor.SetSessionInitHook(FRT_METHOD(RPCProxy::HOOK_Init), &proxy);
supervisor.SetSessionDownHook(FRT_METHOD(RPCProxy::HOOK_Down), &proxy);
supervisor.SetSessionFiniHook(FRT_METHOD(RPCProxy::HOOK_Fini), &proxy);
- supervisor.SetMethodMismatchHook(FRT_METHOD(RPCProxy::HOOK_Mismatch),
- &proxy);
+ supervisor.SetMethodMismatchHook(FRT_METHOD(RPCProxy::HOOK_Mismatch), &proxy);
supervisor.Listen(_argv[1]);
FNET_SignalShutDown ssd(*supervisor.GetTransport());
- supervisor.Main();
+ server.wait_finished();
return 0;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_server.cpp b/fnet/src/examples/frt/rpc/rpc_server.cpp
index 03d618133c9..cc3972166ad 100644
--- a/fnet/src/examples/frt/rpc/rpc_server.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_server.cpp
@@ -91,12 +91,12 @@ RPCServer::Main(int argc, char **argv)
return 1;
}
- _supervisor = new FRT_Supervisor();
+ fnet::frt::StandaloneFRT server;
+ _supervisor = &server.supervisor();
InitRPC(_supervisor);
_supervisor->Listen(argv[1]);
FNET_SignalShutDown ssd(*_supervisor->GetTransport());
- _supervisor->Main();
- delete _supervisor;
+ server.wait_finished();
return 0;
}
diff --git a/fnet/src/tests/frt/method_pt/method_pt.cpp b/fnet/src/tests/frt/method_pt/method_pt.cpp
index 5417fddceeb..608a435bd1d 100644
--- a/fnet/src/tests/frt/method_pt/method_pt.cpp
+++ b/fnet/src/tests/frt/method_pt/method_pt.cpp
@@ -18,6 +18,7 @@ class ComplexHandler3;
Test *_test;
+std::unique_ptr<fnet::frt::StandaloneFRT> _server;
FRT_Supervisor *_supervisor;
FRT_Target *_target;
SimpleHandler *_simpleHandler;
@@ -176,7 +177,8 @@ public:
//-------------------------------------------------------------
void initTest() {
- _supervisor = new FRT_Supervisor();
+ _server = std::make_unique<fnet::frt::StandaloneFRT>();
+ _supervisor = &_server->supervisor();
_simpleHandler = new SimpleHandler();
_mediumHandler1 = new MediumHandler1();
_mediumHandler2 = new MediumHandler2();
@@ -200,9 +202,6 @@ void initTest() {
_target = _supervisor->GetTarget(spec.c_str());
ASSERT_TRUE(_target != nullptr);
- bool startOK = _supervisor->Start();
- ASSERT_TRUE(startOK);
-
FRT_ReflectionBuilder rb(_supervisor);
//-------------------------------------------------------------------
@@ -247,7 +246,6 @@ void initTest() {
void finiTest() {
- _supervisor->ShutDown(true);
delete _complexHandler1;
delete _complexHandler2;
delete _complexHandler3;
@@ -256,7 +254,7 @@ void finiTest() {
delete _mediumHandler3;
delete _simpleHandler;
_target->SubRef();
- delete _supervisor;
+ _server.reset();
}
diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
index 2441ea6eaa0..ed4911175a0 100644
--- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
+++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
@@ -15,7 +15,7 @@ struct Rpc : FRT_Invokable {
FNET_Transport transport;
FRT_Supervisor orb;
Rpc(CryptoEngine::SP crypto, size_t num_threads)
- : thread_pool(128 * 1024), transport(crypto, num_threads), orb(&transport, &thread_pool) {}
+ : thread_pool(128 * 1024), transport(crypto, num_threads), orb(&transport) {}
void start() {
ASSERT_TRUE(transport.Start(&thread_pool));
}
diff --git a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp
index 0e5f4712e61..cdb2636a8c1 100644
--- a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp
+++ b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp
@@ -21,18 +21,19 @@ CryptoEngine::SP tls_crypto = std::make_shared<vespalib::TlsCryptoEngine>(vespal
TT_Tag req_tag("request");
struct Fixture : FRT_Invokable {
- FRT_Supervisor orb;
- Fixture(CryptoEngine::SP crypto) : orb(std::move(crypto)) {
- ASSERT_TRUE(orb.Listen(0));
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & orb;
+ Fixture(CryptoEngine::SP crypto)
+ : server(std::move(crypto)),
+ orb(server.supervisor())
+ {
+ ASSERT_TRUE(orb.Listen(0));
init_rpc();
- ASSERT_TRUE(orb.Start());
}
FRT_Target *connect() {
return orb.GetTarget(orb.GetListenPort());
}
- ~Fixture() {
- orb.ShutDown(true);
- }
+ ~Fixture() = default;
void init_rpc() {
FRT_ReflectionBuilder rb(&orb);
rb.DefineMethod("inc", "l", "l", FRT_METHOD(Fixture::rpc_inc), this);
diff --git a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
index ab21c62bb68..43a61cd9bcd 100644
--- a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
+++ b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
@@ -39,13 +39,13 @@ struct Server : public FRT_Invokable
TEST("detach return invoke") {
Receptor receptor;
- FRT_Supervisor orb;
- Server server(orb, receptor);
- ASSERT_TRUE(orb.Listen(0));
- ASSERT_TRUE(orb.Start());
- std::string spec = vespalib::make_string("tcp/localhost:%d", orb.GetListenPort());
- FRT_Target *target = orb.Get2WayTarget(spec.c_str());
- FRT_RPCRequest *req = orb.AllocRPCRequest();
+ fnet::frt::StandaloneFRT frtServer;
+ FRT_Supervisor & supervisor = frtServer.supervisor();
+ Server server(supervisor, receptor);
+ ASSERT_TRUE(supervisor.Listen(0));
+ std::string spec = vespalib::make_string("tcp/localhost:%d", supervisor.GetListenPort());
+ FRT_Target *target = supervisor.Get2WayTarget(spec.c_str());
+ FRT_RPCRequest *req = supervisor.AllocRPCRequest();
req->SetMethodName("hook");
target->InvokeSync(req, 5.0);
@@ -58,7 +58,6 @@ TEST("detach return invoke") {
}
req->SubRef();
target->SubRef();
- orb.ShutDown(true);
if (receptor.req != 0) {
EXPECT_TRUE(!receptor.req->IsError());
receptor.req->SubRef();
diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp
index a84e3049704..410a60fa08a 100644
--- a/fnet/src/tests/frt/rpc/invoke.cpp
+++ b/fnet/src/tests/frt/rpc/invoke.cpp
@@ -249,8 +249,8 @@ public:
class Fixture
{
private:
- FRT_Supervisor _client;
- FRT_Supervisor _server;
+ fnet::frt::StandaloneFRT _client;
+ fnet::frt::StandaloneFRT _server;
vespalib::string _peerSpec;
FRT_Target *_target;
TestRPC _testRPC;
@@ -258,7 +258,7 @@ private:
public:
FRT_Target &target() { return *_target; }
- FRT_Target *make_bad_target() { return _client.GetTarget("bogus address"); }
+ FRT_Target *make_bad_target() { return _client.supervisor().GetTarget("bogus address"); }
RequestLatch &detached_req() { return _testRPC.detached_req(); }
EchoTest &echo() { return _echoTest; }
@@ -267,16 +267,14 @@ public:
_server(crypto),
_peerSpec(),
_target(nullptr),
- _testRPC(&_server),
- _echoTest(&_server)
+ _testRPC(&_server.supervisor()),
+ _echoTest(&_server.supervisor())
{
- _client.GetTransport()->SetTCPNoDelay(true);
- _server.GetTransport()->SetTCPNoDelay(true);
- ASSERT_TRUE(_server.Listen("tcp/0"));
- ASSERT_TRUE(_server.Start());
- ASSERT_TRUE(_client.Start());
- _peerSpec = SocketSpec::from_host_port("localhost", _server.GetListenPort()).spec();
- _target = _client.GetTarget(_peerSpec.c_str());
+ _client.supervisor().GetTransport()->SetTCPNoDelay(true);
+ _server.supervisor().GetTransport()->SetTCPNoDelay(true);
+ ASSERT_TRUE(_server.supervisor().Listen("tcp/0"));
+ _peerSpec = SocketSpec::from_host_port("localhost", _server.supervisor().GetListenPort()).spec();
+ _target = _client.supervisor().GetTarget(_peerSpec.c_str());
//---------------------------------------------------------------------
MyReq req("frt.rpc.ping");
target().InvokeSync(req.borrow(), timeout);
@@ -284,8 +282,6 @@ public:
}
~Fixture() {
- _client.ShutDown(true);
- _server.ShutDown(true);
_target->SubRef();
}
};
diff --git a/fnet/src/tests/frt/rpc/session.cpp b/fnet/src/tests/frt/rpc/session.cpp
index 96d20ae9c18..24cbedb3ff7 100644
--- a/fnet/src/tests/frt/rpc/session.cpp
+++ b/fnet/src/tests/frt/rpc/session.cpp
@@ -86,14 +86,13 @@ struct RPC : public FRT_Invokable
}
};
-TEST("session") {
- RPC rpc;
- FRT_Supervisor orb(crypto);
+void testSession(RPC & rpc) {
+ fnet::frt::StandaloneFRT frt(crypto);
+ FRT_Supervisor & orb = frt.supervisor();
char spec[64];
rpc.Init(&orb);
ASSERT_TRUE(orb.Listen("tcp/0"));
sprintf(spec, "tcp/localhost:%d", orb.GetListenPort());
- ASSERT_TRUE(orb.Start());
FRT_Target *target = orb.GetTarget(spec);
FRT_RPCRequest *req = orb.AllocRPCRequest();
@@ -122,7 +121,10 @@ TEST("session") {
req->SubRef();
target->SubRef();
- orb.ShutDown(true);
+}
+TEST("session") {
+ RPC rpc;
+ testSession(rpc);
EXPECT_TRUE(Session::GetCnt() == 0);
EXPECT_TRUE(!rpc.bogusFini);
};
diff --git a/fnet/src/tests/frt/rpc/sharedblob.cpp b/fnet/src/tests/frt/rpc/sharedblob.cpp
index a48ecbb1da7..1c0503454c7 100644
--- a/fnet/src/tests/frt/rpc/sharedblob.cpp
+++ b/fnet/src/tests/frt/rpc/sharedblob.cpp
@@ -118,7 +118,8 @@ struct ServerSampler : public FRT_Invokable
};
TEST("testExplicitShared") {
- FRT_Supervisor orb;
+ fnet::frt::StandaloneFRT frt;
+ FRT_Supervisor & orb = frt.supervisor();
MyBlob blob;
FRT_RPCRequest *req = orb.AllocRPCRequest();
@@ -171,7 +172,8 @@ TEST("testExplicitShared") {
TEST("testImplicitShared") {
DataSet dataSet;
- FRT_Supervisor orb;
+ fnet::frt::StandaloneFRT frt;
+ FRT_Supervisor & orb = frt.supervisor();
FRT_RPCRequest *req = orb.AllocRPCRequest();
ServerSampler serverSampler(dataSet, req);
{
@@ -182,7 +184,6 @@ TEST("testImplicitShared") {
orb.Listen(0);
int port = orb.GetListenPort();
ASSERT_TRUE(port != 0);
- orb.Start();
char tmp[64];
snprintf(tmp, sizeof(tmp), "tcp/localhost:%d", port);
@@ -255,7 +256,6 @@ TEST("testImplicitShared") {
}
req->SubRef();
target->SubRef();
- orb.ShutDown(true);
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp
index f76e66c2af6..b91b9fa4b39 100644
--- a/fnet/src/tests/info/info.cpp
+++ b/fnet/src/tests/info/info.cpp
@@ -37,12 +37,12 @@ struct RPC : public FRT_Invokable
TEST("info") {
RPC rpc;
- FRT_Supervisor orb;
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & orb = server.supervisor();
char spec[64];
rpc.Init(&orb);
ASSERT_TRUE(orb.Listen("tcp/0"));
sprintf(spec, "tcp/localhost:%d", orb.GetListenPort());
- ASSERT_TRUE(orb.Start());
FRT_Target *target = orb.GetTarget(spec);
FRT_RPCRequest *local_info = orb.AllocRPCRequest();
@@ -65,7 +65,6 @@ TEST("info") {
target->SubRef();
local_info->SubRef();
remote_info->SubRef();
- orb.ShutDown(true);
};
TEST("size of important objects")
diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp
index e509223c005..d5c7e0847f7 100644
--- a/fnet/src/vespa/fnet/frt/supervisor.cpp
+++ b/fnet/src/vespa/fnet/frt/supervisor.cpp
@@ -9,57 +9,25 @@
#include <vespa/fnet/connector.h>
#include <vespa/fastos/thread.h>
-FRT_Supervisor::FRT_Supervisor(FNET_Transport *transport,
- FastOS_ThreadPool *threadPool)
+FRT_Supervisor::FRT_Supervisor(FNET_Transport *transport)
: _transport(transport),
- _threadPool(threadPool),
- _standAlone(false),
_packetFactory(),
_packetStreamer(&_packetFactory),
_connector(nullptr),
_reflectionManager(),
_rpcHooks(&_reflectionManager),
_connHooks(*this),
- _methodMismatchHook(nullptr)
+ _methodMismatchHook()
{
_rpcHooks.InitRPC(this);
}
-FRT_Supervisor::FRT_Supervisor(vespalib::CryptoEngine::SP crypto,
- uint32_t threadStackSize,
- uint32_t maxThreads)
- : _transport(nullptr),
- _threadPool(nullptr),
- _standAlone(true),
- _packetFactory(),
- _packetStreamer(&_packetFactory),
- _connector(nullptr),
- _reflectionManager(),
- _rpcHooks(&_reflectionManager),
- _connHooks(*this),
- _methodMismatchHook(nullptr)
-{
- _transport = new FNET_Transport(std::move(crypto), 1);
- assert(_transport != nullptr);
- if (threadStackSize > 0) {
- _threadPool = new FastOS_ThreadPool(threadStackSize, maxThreads);
- assert(_threadPool != nullptr);
- }
- _rpcHooks.InitRPC(this);
-}
-
-
FRT_Supervisor::~FRT_Supervisor()
{
- if (_standAlone) {
- delete _transport;
- delete _threadPool;
- }
if (_connector != nullptr) {
_connector->SubRef();
}
- delete _methodMismatchHook;
}
FNET_Scheduler *
@@ -134,43 +102,35 @@ FRT_Supervisor::AllocRPCRequest(FRT_RPCRequest *tradein)
void
-FRT_Supervisor::SetSessionInitHook(FRT_METHOD_PT method,
- FRT_Invokable *handler)
+FRT_Supervisor::SetSessionInitHook(FRT_METHOD_PT method, FRT_Invokable *handler)
{
_connHooks.SetSessionInitHook(method, handler);
}
void
-FRT_Supervisor::SetSessionDownHook(FRT_METHOD_PT method,
- FRT_Invokable *handler)
+FRT_Supervisor::SetSessionDownHook(FRT_METHOD_PT method, FRT_Invokable *handler)
{
_connHooks.SetSessionDownHook(method, handler);
}
void
-FRT_Supervisor::SetSessionFiniHook(FRT_METHOD_PT method,
- FRT_Invokable *handler)
+FRT_Supervisor::SetSessionFiniHook(FRT_METHOD_PT method, FRT_Invokable *handler)
{
_connHooks.SetSessionFiniHook(method, handler);
}
void
-FRT_Supervisor::SetMethodMismatchHook(FRT_METHOD_PT method,
- FRT_Invokable *handler)
+FRT_Supervisor::SetMethodMismatchHook(FRT_METHOD_PT method, FRT_Invokable *handler)
{
- delete _methodMismatchHook;
- _methodMismatchHook = new FRT_Method("frt.hook.methodMismatch", "*", "*",
- method, handler);
- assert(_methodMismatchHook != nullptr);
+ _methodMismatchHook = std::make_unique<FRT_Method>("frt.hook.methodMismatch", "*", "*", method, handler);
}
void
-FRT_Supervisor::InvokeVoid(FNET_Connection *conn,
- FRT_RPCRequest *req)
+FRT_Supervisor::InvokeVoid(FNET_Connection *conn, FRT_RPCRequest *req)
{
if (conn != nullptr) {
FNET_Channel *ch = conn->OpenChannel();
@@ -183,11 +143,7 @@ FRT_Supervisor::InvokeVoid(FNET_Connection *conn,
void
-FRT_Supervisor::InvokeAsync(SchedulerPtr scheduler,
- FNET_Connection *conn,
- FRT_RPCRequest *req,
- double timeout,
- FRT_IRequestWait *waiter)
+FRT_Supervisor::InvokeAsync(SchedulerPtr scheduler, FNET_Connection *conn, FRT_RPCRequest *req, double timeout, FRT_IRequestWait *waiter)
{
uint32_t chid;
FNET_Packet *packet = req->CreateRequestPacket(true);
@@ -209,10 +165,7 @@ FRT_Supervisor::InvokeAsync(SchedulerPtr scheduler,
void
-FRT_Supervisor::InvokeSync(SchedulerPtr scheduler,
- FNET_Connection *conn,
- FRT_RPCRequest *req,
- double timeout)
+FRT_Supervisor::InvokeSync(SchedulerPtr scheduler, FNET_Connection *conn, FRT_RPCRequest *req, double timeout)
{
FRT_SingleReqWait waiter;
InvokeAsync(scheduler, conn, req, timeout, &waiter);
@@ -250,7 +203,7 @@ FNET_IPacketHandler::HP_RetCode
FRT_Supervisor::HandlePacket(FNET_Packet *packet, FNET_Context context)
{
uint32_t pcode = packet->GetPCODE() & 0xffff; // remove flags
- FRT_RPCRequest *req = (FRT_RPCRequest *) context._value.VOIDP;
+ auto *req = (FRT_RPCRequest *) context._value.VOIDP;
FRT_RPCInvoker *invoker = nullptr;
bool noReply = false;
@@ -265,9 +218,9 @@ FRT_Supervisor::HandlePacket(FNET_Packet *packet, FNET_Context context)
if (req->IsError()) {
if (req->GetErrorCode() != FRTE_RPC_BAD_REQUEST
- && _methodMismatchHook != nullptr)
+ && _methodMismatchHook)
{
- invoker->ForceMethod(_methodMismatchHook);
+ invoker->ForceMethod(_methodMismatchHook.get());
return (invoker->Invoke()) ?
FNET_FREE_CHANNEL : FNET_CLOSE_CHANNEL;
}
@@ -282,40 +235,6 @@ FRT_Supervisor::HandlePacket(FNET_Packet *packet, FNET_Context context)
}
}
-
-bool
-FRT_Supervisor::Start()
-{
- assert(_standAlone);
- if (_threadPool == nullptr)
- return false;
- return _transport->Start(_threadPool);
-}
-
-
-void
-FRT_Supervisor::Main()
-{
- assert(_standAlone);
- _transport->Main();
-}
-
-
-void
-FRT_Supervisor::ShutDown(bool waitFinished)
-{
- assert(_standAlone);
- _transport->ShutDown(waitFinished);
-}
-
-
-void
-FRT_Supervisor::WaitFinished()
-{
- assert(_standAlone);
- _transport->WaitFinished();
-}
-
//----------------------------------------------------
// RPC Hooks
//----------------------------------------------------
@@ -403,57 +322,38 @@ FRT_Supervisor::RPCHooks::RPC_GetMethodInfo(FRT_RPCRequest *req)
FRT_Supervisor::ConnHooks::ConnHooks(FRT_Supervisor &parent)
: _parent(parent),
- _sessionInitHook(nullptr),
- _sessionDownHook(nullptr),
- _sessionFiniHook(nullptr)
+ _sessionInitHook(),
+ _sessionDownHook(),
+ _sessionFiniHook()
{
}
-FRT_Supervisor::ConnHooks::~ConnHooks()
-{
- delete _sessionInitHook;
- delete _sessionDownHook;
- delete _sessionFiniHook;
-}
-
+FRT_Supervisor::ConnHooks::~ConnHooks() = default;
void
-FRT_Supervisor::ConnHooks::SetSessionInitHook(FRT_METHOD_PT method,
- FRT_Invokable *handler)
+FRT_Supervisor::ConnHooks::SetSessionInitHook(FRT_METHOD_PT method, FRT_Invokable *handler)
{
- delete _sessionInitHook;
- _sessionInitHook = new FRT_Method("frt.hook.sessionInit", "", "",
- method, handler);
- assert(_sessionInitHook != nullptr);
+ _sessionInitHook = std::make_unique<FRT_Method>("frt.hook.sessionInit", "", "", method, handler);
}
void
-FRT_Supervisor::ConnHooks::SetSessionDownHook(FRT_METHOD_PT method,
- FRT_Invokable *handler)
+FRT_Supervisor::ConnHooks::SetSessionDownHook(FRT_METHOD_PT method, FRT_Invokable *handler)
{
- delete _sessionDownHook;
- _sessionDownHook = new FRT_Method("frt.hook.sessionDown", "", "",
- method, handler);
- assert(_sessionDownHook != nullptr);
+ _sessionDownHook = std::make_unique<FRT_Method>("frt.hook.sessionDown", "", "", method, handler);
}
void
-FRT_Supervisor::ConnHooks::SetSessionFiniHook(FRT_METHOD_PT method,
- FRT_Invokable *handler)
+FRT_Supervisor::ConnHooks::SetSessionFiniHook(FRT_METHOD_PT method, FRT_Invokable *handler)
{
- delete _sessionFiniHook;
- _sessionFiniHook = new FRT_Method("frt.hook.sessionFini", "", "",
- method, handler);
- assert(_sessionFiniHook != nullptr);
+ _sessionFiniHook = std::make_unique<FRT_Method>("frt.hook.sessionFini", "", "", method, handler);
}
void
-FRT_Supervisor::ConnHooks::InvokeHook(FRT_Method *hook,
- FNET_Connection *conn)
+FRT_Supervisor::ConnHooks::InvokeHook(FRT_Method *hook, FNET_Connection *conn)
{
FRT_RPCRequest *req = _parent.AllocRPCRequest();
req->SetMethodName(hook->GetName());
@@ -466,8 +366,8 @@ FRT_Supervisor::ConnHooks::InitAdminChannel(FNET_Channel *channel)
{
FNET_Connection *conn = channel->GetConnection();
conn->SetCleanupHandler(this);
- if (_sessionInitHook != nullptr) {
- InvokeHook(_sessionInitHook, conn);
+ if (_sessionInitHook) {
+ InvokeHook(_sessionInitHook.get(), conn);
}
channel->SetHandler(this);
channel->SetContext(channel);
@@ -476,16 +376,15 @@ FRT_Supervisor::ConnHooks::InitAdminChannel(FNET_Channel *channel)
FNET_IPacketHandler::HP_RetCode
-FRT_Supervisor::ConnHooks::HandlePacket(FNET_Packet *packet,
- FNET_Context context)
+FRT_Supervisor::ConnHooks::HandlePacket(FNET_Packet *packet, FNET_Context context)
{
if (!packet->IsChannelLostCMD()) {
packet->Free();
return FNET_KEEP_CHANNEL;
}
FNET_Channel *ch = context._value.CHANNEL;
- if (_sessionDownHook != nullptr) {
- InvokeHook(_sessionDownHook, ch->GetConnection());
+ if (_sessionDownHook) {
+ InvokeHook(_sessionDownHook.get(), ch->GetConnection());
}
return FNET_FREE_CHANNEL;
}
@@ -494,8 +393,8 @@ FRT_Supervisor::ConnHooks::HandlePacket(FNET_Packet *packet,
void
FRT_Supervisor::ConnHooks::Cleanup(FNET_Connection *conn)
{
- if (_sessionFiniHook != nullptr) {
- InvokeHook(_sessionFiniHook, conn);
+ if (_sessionFiniHook) {
+ InvokeHook(_sessionFiniHook.get(), conn);
}
}
@@ -506,3 +405,33 @@ FRT_Supervisor::SchedulerPtr::SchedulerPtr(FNET_Transport *transport)
FRT_Supervisor::SchedulerPtr::SchedulerPtr(FNET_TransportThread *transport_thread)
: ptr(transport_thread->GetScheduler())
{ }
+
+namespace fnet::frt {
+
+StandaloneFRT::StandaloneFRT()
+ : _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)),
+ _transport(std::make_unique<FNET_Transport>()),
+ _supervisor(std::make_unique<FRT_Supervisor>(_transport.get()))
+{
+ _transport->Start(_threadPool.get());
+}
+
+StandaloneFRT::StandaloneFRT(vespalib::CryptoEngine::SP crypto)
+ : _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)),
+ _transport(std::make_unique<FNET_Transport>(std::move(crypto), 1)),
+ _supervisor(std::make_unique<FRT_Supervisor>(_transport.get()))
+{
+ _transport->Start(_threadPool.get());
+}
+
+StandaloneFRT::~StandaloneFRT()
+{
+ _transport->ShutDown(true);
+}
+
+void
+StandaloneFRT::wait_finished() const {
+ _transport->WaitFinished();
+}
+
+}
diff --git a/fnet/src/vespa/fnet/frt/supervisor.h b/fnet/src/vespa/fnet/frt/supervisor.h
index dc7fb496239..1247cb08402 100644
--- a/fnet/src/vespa/fnet/frt/supervisor.h
+++ b/fnet/src/vespa/fnet/frt/supervisor.h
@@ -9,7 +9,6 @@
#include <vespa/fnet/ipackethandler.h>
#include <vespa/fnet/connection.h>
#include <vespa/fnet/simplepacketstreamer.h>
-#include <vespa/vespalib/net/crypto_engine.h>
class FNET_Transport;
class FRT_Target;
@@ -18,6 +17,9 @@ class FNET_Scheduler;
class FRT_RPCInvoker;
class FRT_IRequestWait;
+namespace vespalib { class CryptoEngine; }
+
+
class FRT_Supervisor : public FNET_IServerAdapter,
public FNET_IPacketHandler
{
@@ -26,12 +28,10 @@ public:
{
private:
FRT_ReflectionManager *_reflectionManager;
-
- RPCHooks(const RPCHooks &);
- RPCHooks &operator=(const RPCHooks &);
-
public:
- RPCHooks(FRT_ReflectionManager *reflect)
+ RPCHooks(const RPCHooks &) = delete;
+ RPCHooks &operator=(const RPCHooks &) = delete;
+ explicit RPCHooks(FRT_ReflectionManager *reflect)
: _reflectionManager(reflect) {}
void InitRPC(FRT_Supervisor *supervisor);
@@ -45,17 +45,15 @@ public:
public FNET_IPacketHandler
{
private:
- FRT_Supervisor &_parent;
- FRT_Method *_sessionInitHook;
- FRT_Method *_sessionDownHook;
- FRT_Method *_sessionFiniHook;
-
- ConnHooks(const ConnHooks &);
- ConnHooks &operator=(const ConnHooks &);
-
+ FRT_Supervisor &_parent;
+ std::unique_ptr<FRT_Method> _sessionInitHook;
+ std::unique_ptr<FRT_Method> _sessionDownHook;
+ std::unique_ptr<FRT_Method> _sessionFiniHook;
public:
- ConnHooks(FRT_Supervisor &parent);
- ~ConnHooks();
+ ConnHooks(const ConnHooks &) = delete;
+ ConnHooks &operator=(const ConnHooks &) = delete;
+ explicit ConnHooks(FRT_Supervisor &parent);
+ ~ConnHooks() override;
void SetSessionInitHook(FRT_METHOD_PT method, FRT_Invokable *handler);
void SetSessionDownHook(FRT_METHOD_PT method, FRT_Invokable *handler);
@@ -67,32 +65,23 @@ public:
};
private:
- FNET_Transport *_transport;
- FastOS_ThreadPool *_threadPool;
- bool _standAlone;
-
- FRT_PacketFactory _packetFactory;
- FNET_SimplePacketStreamer _packetStreamer;
- FNET_Connector *_connector;
- FRT_ReflectionManager _reflectionManager;
- RPCHooks _rpcHooks;
- ConnHooks _connHooks;
- FRT_Method *_methodMismatchHook;
-
- FRT_Supervisor(const FRT_Supervisor &);
- FRT_Supervisor &operator=(const FRT_Supervisor &);
+ FNET_Transport *_transport;
+ FRT_PacketFactory _packetFactory;
+ FNET_SimplePacketStreamer _packetStreamer;
+ FNET_Connector *_connector;
+ FRT_ReflectionManager _reflectionManager;
+ RPCHooks _rpcHooks;
+ ConnHooks _connHooks;
+ std::unique_ptr<FRT_Method> _methodMismatchHook;
public:
- FRT_Supervisor(FNET_Transport *transport, FastOS_ThreadPool *threadPool);
- FRT_Supervisor(vespalib::CryptoEngine::SP crypto, uint32_t threadStackSize = 65000, uint32_t maxThreads = 0);
- FRT_Supervisor(uint32_t threadStackSize = 65000, uint32_t maxThreads = 0)
- : FRT_Supervisor(vespalib::CryptoEngine::get_default(), threadStackSize, maxThreads) {}
- virtual ~FRT_Supervisor();
+ explicit FRT_Supervisor(FNET_Transport *transport);
+ FRT_Supervisor(const FRT_Supervisor &) = delete;
+ FRT_Supervisor &operator=(const FRT_Supervisor &) = delete;
+ ~FRT_Supervisor() override;
- bool StandAlone() { return _standAlone; }
FNET_Transport *GetTransport() { return _transport; }
FNET_Scheduler *GetScheduler();
- FastOS_ThreadPool *GetThreadPool() { return _threadPool; }
FRT_ReflectionManager *GetReflectionManager() { return &_reflectionManager; }
bool Listen(const char *spec);
@@ -100,8 +89,7 @@ public:
uint32_t GetListenPort() const;
FRT_Target *GetTarget(const char *spec);
- FRT_Target *Get2WayTarget(const char *spec,
- FNET_Context connContext = FNET_Context());
+ FRT_Target *Get2WayTarget(const char *spec, FNET_Context connContext = FNET_Context());
FRT_Target *GetTarget(int port);
FRT_RPCRequest *AllocRPCRequest(FRT_RPCRequest *tradein = nullptr);
@@ -120,17 +108,10 @@ public:
};
// methods for performing rpc invocations
- static void InvokeVoid(FNET_Connection *conn,
- FRT_RPCRequest *req);
- static void InvokeAsync(SchedulerPtr scheduler,
- FNET_Connection *conn,
- FRT_RPCRequest *req,
- double timeout,
- FRT_IRequestWait *waiter);
- static void InvokeSync(SchedulerPtr scheduler,
- FNET_Connection *conn,
- FRT_RPCRequest *req,
- double timeout);
+ static void InvokeVoid(FNET_Connection *conn, FRT_RPCRequest *req);
+ static void InvokeSync(SchedulerPtr scheduler, FNET_Connection *conn, FRT_RPCRequest *req, double timeout);
+ static void InvokeAsync(SchedulerPtr scheduler, FNET_Connection *conn, FRT_RPCRequest *req, double timeout, FRT_IRequestWait *waiter);
+
// FNET ServerAdapter Interface
bool InitAdminChannel(FNET_Channel *channel) override;
@@ -138,11 +119,26 @@ public:
// Packet Handling
HP_RetCode HandlePacket(FNET_Packet *packet, FNET_Context context) override;
+};
- // Methods for controlling transport object in standalone mode
- bool Start();
- void Main();
- void ShutDown(bool waitFinished);
- void WaitFinished();
+namespace fnet::frt {
+
+/**
+ * This is a simple class that makes it easy to test RPC.
+ * Normally you do not want use it in production code as it hides your possibilites and responsibilities.
+ */
+class StandaloneFRT {
+public:
+ StandaloneFRT();
+ explicit StandaloneFRT(std::shared_ptr<vespalib::CryptoEngine> crypto);
+ ~StandaloneFRT();
+ FRT_Supervisor & supervisor() { return *_supervisor; }
+ // TODO Remove this method as it is a relic from the ancient non-threaded world.
+ void wait_finished() const;
+private:
+ std::unique_ptr<FastOS_ThreadPool> _threadPool;
+ std::unique_ptr<FNET_Transport> _transport;
+ std::unique_ptr<FRT_Supervisor> _supervisor;
};
+}
diff --git a/jrt_test/src/jrt-test/simpleserver/simpleserver.cpp b/jrt_test/src/jrt-test/simpleserver/simpleserver.cpp
index 89d8cd881a8..b45d7986842 100644
--- a/jrt_test/src/jrt-test/simpleserver/simpleserver.cpp
+++ b/jrt_test/src/jrt-test/simpleserver/simpleserver.cpp
@@ -38,8 +38,7 @@ public:
void rpc_inc(FRT_RPCRequest *req)
{
- req->GetReturn()->AddInt32(req->GetParams()->GetValue(0)._intval32
- + 1);
+ req->GetReturn()->AddInt32(req->GetParams()->GetValue(0)._intval32 + 1);
}
void rpc_blob(FRT_RPCRequest *req)
@@ -79,10 +78,10 @@ App::Main()
printf("usage: %s <listenspec>\n", _argv[0]);
return 1;
}
- FRT_Supervisor orb;
- Server server(&orb);
- orb.Listen(_argv[1]);
- orb.Main();
+ fnet::frt::StandaloneFRT frtServer;
+ Server server(&frtServer.supervisor());
+ frtServer.supervisor().Listen(_argv[1]);
+ frtServer.wait_finished();
return 0;
}
diff --git a/jrt_test/src/tests/echo/echo-client.cpp b/jrt_test/src/tests/echo/echo-client.cpp
index 18e5892ef71..c1ffe29e1ce 100644
--- a/jrt_test/src/tests/echo/echo-client.cpp
+++ b/jrt_test/src/tests/echo/echo-client.cpp
@@ -12,9 +12,9 @@ public:
printf("usage : echo_client <connectspec>\n");
return 1;
}
- FRT_Supervisor supervisor;
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & supervisor = server.supervisor();
- supervisor.Start();
FRT_Target *target = supervisor.GetTarget(_argv[1]);
FRT_RPCRequest *req = supervisor.AllocRPCRequest();
FRT_Values *args = req->GetParams();
@@ -77,7 +77,6 @@ public:
printf("Return values != parameters.\n");
}
req->SubRef();
- supervisor.ShutDown(true);
return 0;
}
};
diff --git a/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp b/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp
index a465316343d..40c54980c11 100644
--- a/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp
+++ b/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp
@@ -9,16 +9,16 @@ public:
void GetReq(FRT_RPCRequest **req, FRT_Supervisor *supervisor)
{
- if ((*req) != NULL)
+ if ((*req) != nullptr)
(*req)->SubRef();
(*req) = supervisor->AllocRPCRequest();
}
void FreeReqs(FRT_RPCRequest *r1, FRT_RPCRequest *r2)
{
- if (r1 != NULL)
+ if (r1 != nullptr)
r1->SubRef();
- if (r2 != NULL)
+ if (r2 != nullptr)
r2->SubRef();
}
@@ -72,11 +72,11 @@ public:
}
bool verbose = (_argc > 2 && strcmp(_argv[2], "verbose") == 0);
- FRT_Supervisor supervisor;
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & supervisor = server.supervisor();
FRT_Target *target = supervisor.GetTarget(_argv[1]);
- FRT_RPCRequest *m_list = NULL;
- FRT_RPCRequest *info = NULL;
- supervisor.Start();
+ FRT_RPCRequest *m_list = nullptr;
+ FRT_RPCRequest *info = nullptr;
for (int i = 0; i < 50; i++) {
GetReq(&info, &supervisor);
@@ -93,7 +93,6 @@ public:
fprintf(stderr, "Error talking to %s\n", _argv[1]);
info->Print();
FreeReqs(m_list, info);
- supervisor.ShutDown(true);
return 1;
}
@@ -131,7 +130,6 @@ public:
}
FreeReqs(m_list, info);
target->SubRef();
- supervisor.ShutDown(true);
return 0;
}
};
diff --git a/jrt_test/src/tests/mockup-invoke/mockup-server.cpp b/jrt_test/src/tests/mockup-invoke/mockup-server.cpp
index 8456bee1e41..4e1d57e2ce6 100644
--- a/jrt_test/src/tests/mockup-invoke/mockup-server.cpp
+++ b/jrt_test/src/tests/mockup-invoke/mockup-server.cpp
@@ -51,10 +51,10 @@ App::Main()
printf("usage: %s <listenspec>\n", _argv[0]);
return 1;
}
- FRT_Supervisor orb;
- MockupServer server(&orb);
- orb.Listen(_argv[1]);
- orb.Main();
+ fnet::frt::StandaloneFRT orb;
+ MockupServer server(&orb.supervisor());
+ orb.supervisor().Listen(_argv[1]);
+ orb.wait_finished();
return 0;
}
diff --git a/jrt_test/src/tests/rpc-error/test-errors.cpp b/jrt_test/src/tests/rpc-error/test-errors.cpp
index 22e1007a517..c30af8ea579 100644
--- a/jrt_test/src/tests/rpc-error/test-errors.cpp
+++ b/jrt_test/src/tests/rpc-error/test-errors.cpp
@@ -5,22 +5,20 @@
class TestErrors : public vespalib::TestApp
{
private:
+ fnet::frt::StandaloneFRT server;
FRT_Supervisor *client;
FRT_Target *target;
public:
void init(const char *spec) {
- client = new FRT_Supervisor;
+ client = & server.supervisor();
target = client->GetTarget(spec);
- client->Start();
}
void fini() {
- target->SubRef();
- target = NULL;
- client->ShutDown(true);
- delete client;
- client = NULL;
+ target->SubRef();
+ target = nullptr;
+ client = nullptr;
}
void testNoError();
diff --git a/logd/src/logd/config_subscriber.cpp b/logd/src/logd/config_subscriber.cpp
index ddcfcd38aae..7981e13a9a1 100644
--- a/logd/src/logd/config_subscriber.cpp
+++ b/logd/src/logd/config_subscriber.cpp
@@ -100,12 +100,11 @@ ConfigSubscriber::ConfigSubscriber(const config::ConfigUri& configUri)
_handle(),
_has_available(false),
_need_new_forwarder(true),
- _supervisor()
+ _server()
{
_handle = _subscriber.subscribe<LogdConfig>(configUri.getConfigId());
_subscriber.nextConfig(0);
configure(_handle->getConfig());
- _supervisor.Start();
LOG(debug, "got logServer %s", _logserver_host.c_str());
LOG(debug, "got handle %p", _handle.get());
@@ -113,7 +112,6 @@ ConfigSubscriber::ConfigSubscriber(const config::ConfigUri& configUri)
ConfigSubscriber::~ConfigSubscriber()
{
- _supervisor.ShutDown(true);
LOG(debug, "forget logServer %s", _logserver_host.c_str());
LOG(debug, "done ~ConfSub()");
}
@@ -123,7 +121,7 @@ ConfigSubscriber::make_forwarder(Metrics& metrics)
{
std::unique_ptr<Forwarder> result;
if (_use_logserver) {
- result = std::make_unique<RpcForwarder>(metrics, _forward_filter, _supervisor, _logserver_host,
+ result = std::make_unique<RpcForwarder>(metrics, _forward_filter, _server.supervisor(), _logserver_host,
_logserver_rpc_port, 60.0, 100);
} else {
result = std::make_unique<EmptyForwarder>(metrics);
diff --git a/logd/src/logd/config_subscriber.h b/logd/src/logd/config_subscriber.h
index 4d0938dfa3b..a60dac4a367 100644
--- a/logd/src/logd/config_subscriber.h
+++ b/logd/src/logd/config_subscriber.h
@@ -28,8 +28,7 @@ private:
config::ConfigHandle<cloud::config::log::LogdConfig>::UP _handle;
bool _has_available;
bool _need_new_forwarder;
- FRT_Supervisor _supervisor;
-
+ fnet::frt::StandaloneFRT _server;
public:
bool checkAvailable();
void latch();
diff --git a/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp
index 5b70434e793..92a641f639e 100644
--- a/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp
+++ b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp
@@ -31,7 +31,7 @@ decode_log_request(FRT_Values& src, ProtoConverter::ProtoLogRequest& dst)
std::string garbage("garbage");
struct RpcServer : public FRT_Invokable {
- FRT_Supervisor supervisor;
+ fnet::frt::StandaloneFRT server;
int request_count;
std::vector<std::string> messages;
bool reply_with_error;
@@ -39,23 +39,20 @@ struct RpcServer : public FRT_Invokable {
public:
RpcServer()
- : supervisor(),
+ : server(),
request_count(0),
messages(),
reply_with_error(false),
reply_with_proto_response(true)
{
- supervisor.Listen(0);
- FRT_ReflectionBuilder builder(&supervisor);
+ server.supervisor().Listen(0);
+ FRT_ReflectionBuilder builder(&server.supervisor());
builder.DefineMethod("vespa.logserver.archiveLogMessages", "bix", "bix",
FRT_METHOD(RpcServer::rpc_archive_log_messages), this);
- supervisor.Start();
- }
- ~RpcServer() {
- supervisor.ShutDown(true);
}
+ ~RpcServer() = default;
int get_listen_port() {
- return supervisor.GetListenPort();
+ return server.supervisor().GetListenPort();
}
void rpc_archive_log_messages(FRT_RPCRequest* request) {
ProtoConverter::ProtoLogRequest proto_request;
@@ -96,17 +93,14 @@ struct MockMetricsManager : public DummyMetricsManager {
class ClientSupervisor {
private:
- FRT_Supervisor _supervisor;
+ fnet::frt::StandaloneFRT _client;
public:
ClientSupervisor()
- : _supervisor()
+ : _client()
{
- _supervisor.Start();
- }
- ~ClientSupervisor() {
- _supervisor.ShutDown(true);
}
- FRT_Supervisor& get() { return _supervisor; }
+ ~ClientSupervisor() = default;
+ FRT_Supervisor& get() { return _client.supervisor(); }
};
diff --git a/messagebus/src/tests/shutdown/shutdown.cpp b/messagebus/src/tests/shutdown/shutdown.cpp
index 46a8a6518f0..1706da3b55f 100644
--- a/messagebus/src/tests/shutdown/shutdown.cpp
+++ b/messagebus/src/tests/shutdown/shutdown.cpp
@@ -37,21 +37,18 @@ TEST_APPHOOK(Test);
void
Test::requireThatListenFailedIsExceptionSafe()
{
- FRT_Supervisor orb;
- ASSERT_TRUE(orb.Listen(0));
- ASSERT_TRUE(orb.Start());
+ fnet::frt::StandaloneFRT orb;
+ ASSERT_TRUE(orb.supervisor().Listen(0));
Slobrok slobrok;
try {
TestServer bar(MessageBusParams(),
RPCNetworkParams(slobrok.config())
- .setListenPort(orb.GetListenPort()));
+ .setListenPort(orb.supervisor().GetListenPort()));
EXPECT_TRUE(false);
} catch (vespalib::Exception &e) {
- EXPECT_EQUAL("Failed to start network.",
- e.getMessage());
+ EXPECT_EQUAL("Failed to start network.", e.getMessage());
}
- orb.ShutDown(true);
}
void
diff --git a/messagebus/src/tests/targetpool/targetpool.cpp b/messagebus/src/tests/targetpool/targetpool.cpp
index 0ae70daf489..0e0e566f2be 100644
--- a/messagebus/src/tests/targetpool/targetpool.cpp
+++ b/messagebus/src/tests/targetpool/targetpool.cpp
@@ -38,8 +38,8 @@ Test::Main()
TestServer srv3(Identity("srv3"), RoutingSpec(), slobrok);
RPCServiceAddress adr3("", srv3.mb.getConnectionSpec());
- FRT_Supervisor orb(1024u, 1);
- ASSERT_TRUE(orb.Start());
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & orb = server.supervisor();
std::unique_ptr<PoolTimer> ptr(new PoolTimer());
PoolTimer &timer = *ptr;
RPCTargetPool pool(std::move(ptr), 0.666);
@@ -92,7 +92,5 @@ Test::Main()
pool.flushTargets(false);
EXPECT_EQUAL(0u, pool.size());
- orb.ShutDown(true);
-
TEST_DONE();
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 358c9ebdeac..be25a73ece2 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -112,7 +112,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_ident(params.getIdentity()),
_threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)),
_transport(std::make_unique<FNET_Transport>()),
- _orb(std::make_unique<FRT_Supervisor>(_transport.get(), nullptr)),
+ _orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_scheduler(*_transport->GetScheduler()),
_targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())),
_targetPoolTask(_scheduler, *_targetPool),
diff --git a/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp b/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp
index 416a25cfb7b..2e097e7141f 100644
--- a/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp
+++ b/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp
@@ -20,19 +20,19 @@ private:
App(const App &);
App& operator=(const App &);
- FRT_Supervisor *_supervisor;
+ std::unique_ptr<fnet::frt::StandaloneFRT> _frt;
FRT_Target *_target;
FRT_RPCRequest *_req;
public:
- App() : _supervisor(NULL),
- _target(NULL),
- _req(NULL) {}
+ App() : _frt(),
+ _target(nullptr),
+ _req(nullptr) {}
virtual ~App()
{
- assert(_supervisor == NULL);
- assert(_target == NULL);
- assert(_req == NULL);
+ assert(!_frt);
+ assert(_target == nullptr);
+ assert(_req == nullptr);
}
int usage()
@@ -49,14 +49,13 @@ public:
void initRPC()
{
- _supervisor = new FRT_Supervisor();
- _req = _supervisor->AllocRPCRequest();
- _supervisor->Start();
+ _frt = std::make_unique<fnet::frt::StandaloneFRT>();
+ _req = _frt->supervisor().AllocRPCRequest();
}
void invokeRPC(bool print, double timeout=5.0)
{
- if (_req == NULL)
+ if (_req == nullptr)
return;
_target->InvokeSync(_req, timeout);
@@ -66,18 +65,16 @@ public:
void finiRPC()
{
- if (_req != NULL) {
+ if (_req != nullptr) {
_req->SubRef();
- _req = NULL;
+ _req = nullptr;
}
- if (_target != NULL) {
+ if (_target != nullptr) {
_target->SubRef();
- _target = NULL;
+ _target = nullptr;
}
- if (_supervisor != NULL) {
- _supervisor->ShutDown(true);
- delete _supervisor;
- _supervisor = NULL;
+ if (_frt) {
+ _frt.reset();
}
}
@@ -115,7 +112,7 @@ public:
try {
slobrok::ConfiguratorFactory sbcfg("admin/slobrok.0");
- slobrok::api::MirrorAPI sbmirror(*_supervisor, sbcfg);
+ slobrok::api::MirrorAPI sbmirror(_frt->supervisor(), sbcfg);
for (int timeout = 1; timeout < 20; timeout++) {
if (!sbmirror.ready()) {
FastOS_Thread::Sleep(50*timeout);
@@ -167,7 +164,7 @@ public:
try {
slobrok::ConfiguratorFactory sbcfg("admin/slobrok.0");
- slobrok::api::MirrorAPI sbmirror(*_supervisor, sbcfg);
+ slobrok::api::MirrorAPI sbmirror(_frt->supervisor(), sbcfg);
for (int timeout = 1; timeout < 20; timeout++) {
if (!sbmirror.ready()) {
FastOS_Thread::Sleep(50*timeout);
@@ -249,9 +246,9 @@ public:
}
if (port != 0) {
- _target = _supervisor->GetTarget(port);
+ _target = _frt->supervisor().GetTarget(port);
} else {
- _target = _supervisor->GetTarget(spec.c_str());
+ _target = _frt->supervisor().GetTarget(spec.c_str());
}
bool invoked = false;
@@ -350,7 +347,7 @@ void
App::monitorLoop()
{
for (;;) {
- FRT_RPCRequest *req = _supervisor->AllocRPCRequest();
+ FRT_RPCRequest *req = _frt->supervisor().AllocRPCRequest();
req->SetMethodName("pandora.rtc.getIncrementalState");
FRT_Values &params = *req->GetParams();
params.AddInt32(2000);
@@ -365,7 +362,7 @@ App::monitorLoop()
FRT_Value &names = rvals.GetValue(0);
FRT_Value &values = rvals.GetValue(1);
struct timeval tnow;
- gettimeofday(&tnow, NULL);
+ gettimeofday(&tnow, nullptr);
for (unsigned int i = 0;
i < names._string_array._len &&
diff --git a/searchcore/src/vespa/searchcore/fdispatch/common/rpc.cpp b/searchcore/src/vespa/searchcore/fdispatch/common/rpc.cpp
index eaff3b90d78..437482dddd2 100644
--- a/searchcore/src/vespa/searchcore/fdispatch/common/rpc.cpp
+++ b/searchcore/src/vespa/searchcore/fdispatch/common/rpc.cpp
@@ -10,7 +10,7 @@ extern char FastS_VersionTag[];
FastS_RPC::FastS_RPC(FastS_AppContext *appCtx) :
_appCtx(appCtx),
_transport(),
- _supervisor(&_transport, _appCtx->GetThreadPool()),
+ _supervisor(&_transport),
_sbregister(_supervisor, slobrok::ConfiguratorFactory("admin/slobrok.0"))
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp
index 2d63f192189..bcf767bb873 100644
--- a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp
@@ -6,6 +6,7 @@
#include <vespa/searchcore/proton/matchengine/matchengine.h>
#include <vespa/vespalib/util/closuretask.h>
#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/transport.h>
#include <vespa/log/log.h>
LOG_SETUP(".proton.server.rtchooks");
@@ -195,7 +196,8 @@ RPCHooksBase::Params::~Params() = default;
RPCHooksBase::RPCHooksBase(Params &params)
: _proton(params.proton),
_docsumByRPC(new DocsumByRPC(_proton.getDocsumBySlime())),
- _orb(std::make_unique<FRT_Supervisor>()),
+ _transport(std::make_unique<FNET_Transport>()),
+ _orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_proto_rpc_adapter(std::make_unique<ProtoRpcAdapter>(
_proton.get_search_server(),
_proton.get_docsum_server(),
@@ -212,7 +214,7 @@ RPCHooksBase::open(Params & params)
initRPC();
_regAPI.registerName((params.identity + "/realtimecontroller").c_str());
_orb->Listen(params.rtcPort);
- _orb->Start();
+ _transport->Start(&_proton.getThreadPool());
LOG(debug, "started monitoring interface");
}
@@ -222,7 +224,7 @@ void
RPCHooksBase::close()
{
LOG(info, "shutting down monitoring interface");
- _orb->ShutDown(true);
+ _transport->ShutDown(true);
_executor.shutdown();
{
std::lock_guard<std::mutex> guard(_stateLock);
diff --git a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h
index ad0a69fcd55..f16237381d6 100644
--- a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h
+++ b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h
@@ -12,6 +12,8 @@
#include <mutex>
#include <condition_variable>
+class FNET_Transport;
+
namespace proton {
class Proton;
@@ -64,6 +66,7 @@ private:
Proton & _proton;
std::unique_ptr<DocsumByRPC> _docsumByRPC;
+ std::unique_ptr<FNET_Transport> _transport;
std::unique_ptr<FRT_Supervisor> _orb;
std::unique_ptr<ProtoRpcAdapter> _proto_rpc_adapter;
slobrok::api::RegisterAPI _regAPI;
diff --git a/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp b/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp
index 60cf1af13a0..6747e4e741e 100644
--- a/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp
+++ b/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp
@@ -62,23 +62,20 @@ struct MyMonitorServer : MonitorServer {
};
struct ProtoRpcAdapterTest : ::testing::Test {
- FRT_Supervisor orb;
+ fnet::frt::StandaloneFRT server;
MySearchServer search;
MyDocsumServer docsum;
MyMonitorServer monitor;
ProtoRpcAdapter adapter;
ProtoRpcAdapterTest()
- : orb(), adapter(search, docsum, monitor, orb)
+ : server(), adapter(search, docsum, monitor, server.supervisor())
{
- orb.Listen(0);
- orb.Start();
+ server.supervisor().Listen(0);
}
FRT_Target *connect() {
- return orb.GetTarget(orb.GetListenPort());
- }
- ~ProtoRpcAdapterTest() {
- orb.ShutDown(true);
+ return server.supervisor().GetTarget(server.supervisor().GetListenPort());
}
+ ~ProtoRpcAdapterTest() = default;
};
//-----------------------------------------------------------------------------
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
index 767c8b45e10..2c6c1e249f4 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
@@ -4,6 +4,8 @@
#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fnet/frt/target.h>
#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/fastos/thread.h>
#include <thread>
#include <vespa/log/log.h>
@@ -47,19 +49,21 @@ TransLogClient::TransLogClient(const vespalib::string & rpcTarget) :
_executor(1, 128 * 1024, translogclient_rpc_callback),
_rpcTarget(rpcTarget),
_sessions(),
- _supervisor(std::make_unique<FRT_Supervisor>()),
+ _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)),
+ _transport(std::make_unique<FNET_Transport>()),
+ _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())),
_target(NULL)
{
reconnect();
exportRPC(*_supervisor);
- _supervisor->Start();
+ _transport->Start(_threadPool.get());
}
TransLogClient::~TransLogClient()
{
disconnect();
_executor.shutdown().sync();
- _supervisor->ShutDown(true);
+ _transport->ShutDown(true);
}
bool TransLogClient::reconnect()
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h
index 267d6e3b0ed..38c30cd5b4c 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h
@@ -10,6 +10,7 @@
#include <map>
#include <vector>
+class FNET_Transport;
class FRT_Supervisor;
class FRT_Target;
@@ -122,7 +123,9 @@ private:
SessionMap _sessions;
//Brute force lock for subscriptions. For multithread safety.
vespalib::Lock _lock;
- std::unique_ptr<FRT_Supervisor> _supervisor;
+ std::unique_ptr<FastOS_ThreadPool> _threadPool;
+ std::unique_ptr<FNET_Transport> _transport;
+ std::unique_ptr<FRT_Supervisor> _supervisor;
FRT_Target * _target;
};
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index 4b3e7bddb07..6d11ab1f5eb 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -6,6 +6,7 @@
#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/fnet/task.h>
+#include <vespa/fnet/transport.h>
#include <fstream>
#include <vespa/log/log.h>
@@ -90,8 +91,9 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con
_defaultCrcType(defaultCrcType),
_commitExecutor(maxThreads, 128*1024),
_sessionExecutor(maxThreads, 128*1024),
- _threadPool(8192, 1),
- _supervisor(std::make_unique<FRT_Supervisor>()),
+ _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)),
+ _transport(std::make_unique<FNET_Transport>()),
+ _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())),
_domains(),
_reqQ(),
_fileHeaderContext(fileHeaderContext)
@@ -119,7 +121,7 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con
bool listenOk(false);
for (int i(600); !listenOk && i; i--) {
if (_supervisor->Listen(listenSpec)) {
- _supervisor->Start();
+ _transport->Start(_threadPool.get());
listenOk = true;
} else {
LOG(warning, "Failed listening at port %s trying for %d seconds more.", listenSpec, i);
@@ -135,7 +137,7 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con
} else {
throw std::runtime_error(make_string("Failed creating tls base dir %s r(%d), e(%d). Requires manual intervention.", _baseDir.c_str(), retval, errno));
}
- start(_threadPool);
+ start(*_threadPool);
}
TransLogServer::~TransLogServer()
@@ -146,7 +148,7 @@ TransLogServer::~TransLogServer()
_commitExecutor.sync();
_sessionExecutor.shutdown();
_sessionExecutor.sync();
- _supervisor->ShutDown(true);
+ _transport->ShutDown(true);
}
bool
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
index 8aedfef6d8d..0d65f36e07d 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
@@ -10,6 +10,7 @@
class FRT_Supervisor;
+class FNET_Transport;
namespace search::common { class FileHeaderContext; }
@@ -85,7 +86,8 @@ private:
const DomainPart::Crc _defaultCrcType;
vespalib::ThreadStackExecutor _commitExecutor;
vespalib::ThreadStackExecutor _sessionExecutor;
- FastOS_ThreadPool _threadPool;
+ std::unique_ptr<FastOS_ThreadPool> _threadPool;
+ std::unique_ptr<FNET_Transport> _transport;
std::unique_ptr<FRT_Supervisor> _supervisor;
DomainList _domains;
mutable std::mutex _lock; // Protects _domains
diff --git a/slobrok/src/apps/check_slobrok/check_slobrok.cpp b/slobrok/src/apps/check_slobrok/check_slobrok.cpp
index ed72c459e44..0342cccb221 100644
--- a/slobrok/src/apps/check_slobrok/check_slobrok.cpp
+++ b/slobrok/src/apps/check_slobrok/check_slobrok.cpp
@@ -13,14 +13,14 @@ LOG_SETUP("check_slobrok");
class Slobrok_Checker : public FastOS_Application
{
private:
- std::unique_ptr<FRT_Supervisor> _supervisor;
+ std::unique_ptr<fnet::frt::StandaloneFRT> _server;
FRT_Target *_target;
Slobrok_Checker(const Slobrok_Checker &);
Slobrok_Checker &operator=(const Slobrok_Checker &);
public:
- Slobrok_Checker() : _supervisor(), _target(nullptr) {}
+ Slobrok_Checker() : _server(), _target(nullptr) {}
virtual ~Slobrok_Checker();
int usage();
void initRPC(const char *spec);
@@ -30,7 +30,7 @@ public:
Slobrok_Checker::~Slobrok_Checker()
{
- LOG_ASSERT( !_supervisor);
+ LOG_ASSERT( !_server);
LOG_ASSERT(_target == nullptr);
}
@@ -46,9 +46,8 @@ Slobrok_Checker::usage()
void
Slobrok_Checker::initRPC(const char *spec)
{
- _supervisor = std::make_unique<FRT_Supervisor>();
- _target = _supervisor->GetTarget(spec);
- _supervisor->Start();
+ _server = std::make_unique<fnet::frt::StandaloneFRT>();
+ _target = _server->supervisor().GetTarget(spec);
}
@@ -59,9 +58,8 @@ Slobrok_Checker::finiRPC()
_target->SubRef();
_target = nullptr;
}
- if (_supervisor) {
- _supervisor->ShutDown(true);
- _supervisor.reset();
+ if (_server) {
+ _server.reset();
}
}
@@ -82,7 +80,7 @@ Slobrok_Checker::Main()
initRPC(tmp.str().c_str());
}
- FRT_RPCRequest *req = _supervisor->AllocRPCRequest();
+ FRT_RPCRequest *req = _server->supervisor().AllocRPCRequest();
req->SetMethodName("slobrok.system.version");
_target->InvokeSync(req, 5.0);
diff --git a/slobrok/src/apps/sbcmd/sbcmd.cpp b/slobrok/src/apps/sbcmd/sbcmd.cpp
index c4b76bd7ed0..f32155ae80b 100644
--- a/slobrok/src/apps/sbcmd/sbcmd.cpp
+++ b/slobrok/src/apps/sbcmd/sbcmd.cpp
@@ -12,14 +12,14 @@ LOG_SETUP("vespa-slobrok-cmd");
class Slobrok_CMD : public FastOS_Application
{
private:
- std::unique_ptr<FRT_Supervisor> _supervisor;
+ std::unique_ptr<fnet::frt::StandaloneFRT> _server;
FRT_Target *_target;
Slobrok_CMD(const Slobrok_CMD &);
Slobrok_CMD &operator=(const Slobrok_CMD &);
public:
- Slobrok_CMD() : _supervisor(), _target(nullptr) {}
+ Slobrok_CMD() : _server(), _target(nullptr) {}
virtual ~Slobrok_CMD();
int usage();
void initRPC(const char *spec);
@@ -29,7 +29,7 @@ public:
Slobrok_CMD::~Slobrok_CMD()
{
- LOG_ASSERT(! _supervisor);
+ LOG_ASSERT(! _server);
LOG_ASSERT(_target == nullptr);
}
@@ -56,9 +56,8 @@ Slobrok_CMD::usage()
void
Slobrok_CMD::initRPC(const char *spec)
{
- _supervisor = std::make_unique<FRT_Supervisor>();
- _target = _supervisor->GetTarget(spec);
- _supervisor->Start();
+ _server = std::make_unique<fnet::frt::StandaloneFRT>();
+ _target = _server->supervisor().GetTarget(spec);
}
@@ -69,9 +68,8 @@ Slobrok_CMD::finiRPC()
_target->SubRef();
_target = nullptr;
}
- if (_supervisor) {
- _supervisor->ShutDown(true);
- _supervisor.reset();
+ if (_server) {
+ _server.reset();
}
}
@@ -95,7 +93,7 @@ Slobrok_CMD::Main()
bool threeTables = false;
bool twoTables = false;
- FRT_RPCRequest *req = _supervisor->AllocRPCRequest();
+ FRT_RPCRequest *req = _server->supervisor().AllocRPCRequest();
req->SetMethodName(_argv[2]);
if (strcmp(_argv[2], "slobrok.admin.listAllRpcServers") == 0) {
diff --git a/slobrok/src/tests/configure/configure.cpp b/slobrok/src/tests/configure/configure.cpp
index 7b1c32bceef..2783d0e3ebf 100644
--- a/slobrok/src/tests/configure/configure.cpp
+++ b/slobrok/src/tests/configure/configure.cpp
@@ -97,8 +97,8 @@ Test::Main()
{
TEST_INIT("configure_test");
- FRT_Supervisor orb1;
- FRT_Supervisor orb2;
+ fnet::frt::StandaloneFRT orb1;
+ fnet::frt::StandaloneFRT orb2;
config::ConfigSet set;
cloud::config::SlobroksConfigBuilder srv1Builder;
@@ -141,18 +141,16 @@ Test::Main()
SlobrokServer serverOne(srvConfig1);
SlobrokServer serverTwo(srvConfig2);
- MirrorAPI mirror1(orb1, cliConfig3); // NB this one will be changed
- MirrorAPI mirror2(orb2, cliConfig2);
+ MirrorAPI mirror1(orb1.supervisor(), cliConfig3); // NB this one will be changed
+ MirrorAPI mirror2(orb2.supervisor(), cliConfig2);
- RegisterAPI reg1(orb1, cliConfig1);
- RegisterAPI reg2(orb2, cliConfig2);
+ RegisterAPI reg1(orb1.supervisor(), cliConfig1);
+ RegisterAPI reg2(orb2.supervisor(), cliConfig2);
- orb1.Listen(18526);
- orb2.Listen(18527);
- std::string myspec1 = createSpec(orb1.GetListenPort());
- std::string myspec2 = createSpec(orb2.GetListenPort());
- orb1.Start();
- orb2.Start();
+ orb1.supervisor().Listen(18526);
+ orb2.supervisor().Listen(18527);
+ std::string myspec1 = createSpec(orb1.supervisor().GetListenPort());
+ std::string myspec2 = createSpec(orb2.supervisor().GetListenPort());
reg1.registerName("A");
reg2.registerName("B");
@@ -183,16 +181,14 @@ Test::Main()
reg1.registerName("A");
reg2.registerName("B");
- FRT_Supervisor orb3;
- FRT_Supervisor orb4;
- RegisterAPI reg3(orb3, cliConfig1);
- RegisterAPI reg4(orb4, cliConfig2);
- orb3.Listen(18528);
- orb4.Listen(18529);
- std::string myspec3 = createSpec(orb3.GetListenPort());
- std::string myspec4 = createSpec(orb4.GetListenPort());
- orb3.Start();
- orb4.Start();
+ fnet::frt::StandaloneFRT orb3;
+ fnet::frt::StandaloneFRT orb4;
+ RegisterAPI reg3(orb3.supervisor(), cliConfig1);
+ RegisterAPI reg4(orb4.supervisor(), cliConfig2);
+ orb3.supervisor().Listen(18528);
+ orb4.supervisor().Listen(18529);
+ std::string myspec3 = createSpec(orb3.supervisor().GetListenPort());
+ std::string myspec4 = createSpec(orb4.supervisor().GetListenPort());
reg3.registerName("B");
reg4.registerName("A");
@@ -217,9 +213,5 @@ Test::Main()
serverOne.stop();
serverTwo.stop();
- orb1.ShutDown(true);
- orb2.ShutDown(true);
- orb3.ShutDown(true);
- orb4.ShutDown(true);
TEST_DONE();
}
diff --git a/slobrok/src/tests/mirrorapi/mirrorapi.cpp b/slobrok/src/tests/mirrorapi/mirrorapi.cpp
index f77dfd80986..23cd201c551 100644
--- a/slobrok/src/tests/mirrorapi/mirrorapi.cpp
+++ b/slobrok/src/tests/mirrorapi/mirrorapi.cpp
@@ -21,7 +21,7 @@ TEST_SETUP(Test);
class Server : public FRT_Invokable
{
private:
- FRT_Supervisor _orb;
+ fnet::frt::StandaloneFRT _server;
std::string _name;
std::string _slobrokSpec;
@@ -34,12 +34,12 @@ public:
Server::Server(std::string name, int port, std::string slobrokSpec)
- : _orb(),
+ : _server(),
_name(name),
_slobrokSpec(slobrokSpec)
{
{
- FRT_ReflectionBuilder rb(&_orb);
+ FRT_ReflectionBuilder rb(&_server.supervisor());
//---------------------------------------------------------------------
rb.DefineMethod("slobrok.callback.listNamesServed", "", "S",
FRT_METHOD(Server::rpc_listNamesServed), this);
@@ -47,8 +47,7 @@ Server::Server(std::string name, int port, std::string slobrokSpec)
rb.ReturnDesc("names", "The rpcserver names on this server");
//---------------------------------------------------------------------
}
- _orb.Listen(port);
- _orb.Start();
+ _server.supervisor().Listen(port);
}
@@ -56,14 +55,14 @@ void
Server::reg()
{
char spec[64];
- sprintf(spec, "tcp/localhost:%d", _orb.GetListenPort());
+ sprintf(spec, "tcp/localhost:%d", _server.supervisor().GetListenPort());
- FRT_RPCRequest *req = _orb.AllocRPCRequest();
+ FRT_RPCRequest *req = _server.supervisor().AllocRPCRequest();
req->SetMethodName("slobrok.registerRpcServer");
req->GetParams()->AddString(_name.c_str());
req->GetParams()->AddString(spec);
- FRT_Target *sb = _orb.GetTarget(_slobrokSpec.c_str());
+ FRT_Target *sb = _server.supervisor().GetTarget(_slobrokSpec.c_str());
sb->InvokeSync(req, 5.0);
sb->SubRef();
req->SubRef();
@@ -79,10 +78,7 @@ Server::rpc_listNamesServed(FRT_RPCRequest *req)
}
-Server::~Server()
-{
- _orb.ShutDown(true);
-}
+Server::~Server() = default;
//-----------------------------------------------------------------------------
@@ -140,10 +136,9 @@ Test::Main()
cloud::config::SlobroksConfig::Slobrok slobrok;
slobrok.connectionspec = "tcp/localhost:18501";
specBuilder.slobrok.push_back(slobrok);
- FRT_Supervisor orb;
- MirrorAPI mirror(orb, config::ConfigUri::createFromInstance(specBuilder));
+ fnet::frt::StandaloneFRT server;
+ MirrorAPI mirror(server.supervisor(), config::ConfigUri::createFromInstance(specBuilder));
EXPECT_TRUE(!mirror.ready());
- orb.Start();
FastOS_Thread::Sleep(1000);
a.reg();
@@ -224,6 +219,5 @@ Test::Main()
.add("A/x/w", "tcp/localhost:18502")));
mock.stop();
- orb.ShutDown(true);
TEST_DONE();
}
diff --git a/slobrok/src/tests/oldapi/old.cpp b/slobrok/src/tests/oldapi/old.cpp
index 42cec186a08..26cf94613bd 100644
--- a/slobrok/src/tests/oldapi/old.cpp
+++ b/slobrok/src/tests/oldapi/old.cpp
@@ -19,9 +19,9 @@ TEST_SETUP(Test);
class Server : public FRT_Invokable
{
private:
- FRT_Supervisor _orb;
- std::string _name;
- std::string _slobrokSpec;
+ fnet::frt::StandaloneFRT _server;
+ std::string _name;
+ std::string _slobrokSpec;
public:
Server(std::string name, int port, std::string slobrokSpec);
@@ -32,12 +32,12 @@ public:
Server::Server(std::string name, int port, std::string slobrokSpec)
- : _orb(),
+ : _server(),
_name(name),
_slobrokSpec(slobrokSpec)
{
{
- FRT_ReflectionBuilder rb(&_orb);
+ FRT_ReflectionBuilder rb(&_server.supervisor());
//---------------------------------------------------------------------
rb.DefineMethod("slobrok.callback.listNamesServed", "", "S",
FRT_METHOD(Server::rpc_listNamesServed), this);
@@ -45,8 +45,7 @@ Server::Server(std::string name, int port, std::string slobrokSpec)
rb.ReturnDesc("names", "The rpcserver names on this server");
//---------------------------------------------------------------------
}
- _orb.Listen(port);
- _orb.Start();
+ _server.supervisor().Listen(port);
}
@@ -54,14 +53,14 @@ void
Server::reg()
{
char spec[64];
- sprintf(spec, "tcp/localhost:%d", _orb.GetListenPort());
+ sprintf(spec, "tcp/localhost:%d", _server.supervisor().GetListenPort());
- FRT_RPCRequest *req = _orb.AllocRPCRequest();
+ FRT_RPCRequest *req = _server.supervisor().AllocRPCRequest();
req->SetMethodName("slobrok.registerRpcServer");
req->GetParams()->AddString(_name.c_str());
req->GetParams()->AddString(spec);
- FRT_Target *sb = _orb.GetTarget(_slobrokSpec.c_str());
+ FRT_Target *sb = _server.supervisor().GetTarget(_slobrokSpec.c_str());
sb->InvokeSync(req, 5.0);
sb->SubRef();
req->SubRef();
@@ -77,10 +76,7 @@ Server::rpc_listNamesServed(FRT_RPCRequest *req)
}
-Server::~Server()
-{
- _orb.ShutDown(true);
-}
+Server::~Server() = default;
//-----------------------------------------------------------------------------
@@ -136,10 +132,9 @@ Test::Main()
std::vector<std::string> slobrokSpecs;
slobrokSpecs.push_back("tcp/localhost:18531");
- FRT_Supervisor orb;
- MirrorOld mirror(orb, slobrokSpecs);
+ fnet::frt::StandaloneFRT server;
+ MirrorOld mirror(server.supervisor(), slobrokSpecs);
EXPECT_TRUE(!mirror.ready());
- orb.Start();
FastOS_Thread::Sleep(1000);
a.reg();
@@ -217,6 +212,5 @@ Test::Main()
.add("A/x/w", "tcp/localhost:18532")));
mock.stop();
- orb.ShutDown(true);
TEST_DONE();
}
diff --git a/slobrok/src/tests/registerapi/registerapi.cpp b/slobrok/src/tests/registerapi/registerapi.cpp
index 2b356319ba0..ac7e662c6f2 100644
--- a/slobrok/src/tests/registerapi/registerapi.cpp
+++ b/slobrok/src/tests/registerapi/registerapi.cpp
@@ -83,12 +83,12 @@ Test::Main()
slobrokSpecs.slobrok.push_back(sb);
slobrok::ConfiguratorFactory config(config::ConfigUri::createFromInstance(slobrokSpecs));
- FRT_Supervisor orb;
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & orb = server.supervisor();
RegisterAPI reg(orb, config);
MirrorAPI mirror(orb, config);
orb.Listen(18549);
std::string myspec = createSpec(orb);
- orb.Start();
reg.registerName("A/x/w");
EXPECT_TRUE(reg.busy());
@@ -216,6 +216,5 @@ Test::Main()
.add("F/y/w", myspec.c_str())));
mock.stop();
- orb.ShutDown(true);
TEST_DONE();
}
diff --git a/slobrok/src/tests/standalone/standalone.cpp b/slobrok/src/tests/standalone/standalone.cpp
index 136f8125c8b..9d3fd694ee1 100644
--- a/slobrok/src/tests/standalone/standalone.cpp
+++ b/slobrok/src/tests/standalone/standalone.cpp
@@ -9,7 +9,7 @@
class Server : public FRT_Invokable
{
private:
- FRT_Supervisor _orb;
+ fnet::frt::StandaloneFRT _server;
std::string _name;
public:
@@ -20,11 +20,11 @@ public:
Server::Server(std::string name, int port)
- : _orb(),
+ : _server(),
_name(name)
{
{
- FRT_ReflectionBuilder rb(&_orb);
+ FRT_ReflectionBuilder rb(&_server.supervisor());
//---------------------------------------------------------------------
rb.DefineMethod("slobrok.callback.listNamesServed", "", "S",
FRT_METHOD(Server::rpc_listNamesServed), this);
@@ -32,8 +32,7 @@ Server::Server(std::string name, int port)
rb.ReturnDesc("names", "The rpcserver names on this server");
//---------------------------------------------------------------------
}
- _orb.Listen(port);
- _orb.Start();
+ _server.supervisor().Listen(port);
}
@@ -46,16 +45,13 @@ Server::rpc_listNamesServed(FRT_RPCRequest *req)
}
-Server::~Server()
-{
- _orb.ShutDown(true);
-}
+Server::~Server() = default;
namespace {
bool checkOk(FRT_RPCRequest *req)
{
- if (req == NULL) {
+ if (req == nullptr) {
fprintf(stderr, "req is null pointer, this is bad\n");
return false;
}
@@ -80,7 +76,7 @@ private:
public:
SubReferer(T* &t) : _t(t) {}
~SubReferer() {
- if (_t != NULL) _t->SubRef();
+ if (_t != nullptr) _t->SubRef();
}
};
@@ -118,14 +114,13 @@ TEST("standalone") {
slobrok::SlobrokServer slobrokServer(18541);
Stopper<slobrok::SlobrokServer> ssCleaner(slobrokServer);
- FRT_Supervisor orb;
- orb.Start();
- ShutDowner<FRT_Supervisor> orbCleaner(orb);
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & orb = server.supervisor();
FRT_Target *sb = orb.GetTarget(18541);
SubReferer<FRT_Target> sbCleaner(sb);
- FRT_RPCRequest *req = NULL;
+ FRT_RPCRequest *req = nullptr;
SubReferer<FRT_RPCRequest> reqCleaner(req);
for (int retry=0; retry < 5*61; retry++) {
diff --git a/slobrok/src/tests/startsome/rpc_info.cpp b/slobrok/src/tests/startsome/rpc_info.cpp
index 036c486fe4f..0f8739d12be 100644
--- a/slobrok/src/tests/startsome/rpc_info.cpp
+++ b/slobrok/src/tests/startsome/rpc_info.cpp
@@ -12,16 +12,16 @@ public:
void GetReq(FRT_RPCRequest **req, FRT_Supervisor *supervisor)
{
- if ((*req) != NULL)
+ if ((*req) != nullptr)
(*req)->SubRef();
(*req) = supervisor->AllocRPCRequest();
}
void FreeReqs(FRT_RPCRequest *r1, FRT_RPCRequest *r2)
{
- if (r1 != NULL)
+ if (r1 != nullptr)
r1->SubRef();
- if (r2 != NULL)
+ if (r2 != nullptr)
r2->SubRef();
}
@@ -75,11 +75,11 @@ public:
}
bool verbose = (_argc > 2 && strcmp(_argv[2], "verbose") == 0);
- FRT_Supervisor supervisor;
+ fnet::frt::StandaloneFRT server;
+ FRT_Supervisor & supervisor = server.supervisor();
FRT_Target *target = supervisor.GetTarget(_argv[1]);
- FRT_RPCRequest *m_list = NULL;
- FRT_RPCRequest *info = NULL;
- supervisor.Start();
+ FRT_RPCRequest *m_list = nullptr;
+ FRT_RPCRequest *info = nullptr;
GetReq(&info, &supervisor);
info->SetMethodName("frt.rpc.ping");
@@ -87,7 +87,6 @@ public:
if (info->IsError()) {
fprintf(stderr, "Error talking to %s\n", _argv[1]);
FreeReqs(m_list, info);
- supervisor.ShutDown(true);
return 1;
}
@@ -125,7 +124,6 @@ public:
}
FreeReqs(m_list, info);
target->SubRef();
- supervisor.ShutDown(true);
return 0;
}
};
diff --git a/slobrok/src/tests/startsome/tstdst.cpp b/slobrok/src/tests/startsome/tstdst.cpp
index 4723b3819d7..c119ed3c026 100644
--- a/slobrok/src/tests/startsome/tstdst.cpp
+++ b/slobrok/src/tests/startsome/tstdst.cpp
@@ -132,7 +132,7 @@ RPCHooks::rpc_stop(FRT_RPCRequest *req)
TstEnv::TstEnv(int sbp, int myp, const char *n)
: _transport(new FNET_Transport()),
- _supervisor(new FRT_Supervisor(_transport, NULL)),
+ _supervisor(new FRT_Supervisor(_transport)),
_myport(myp),
_sbport(sbp),
_rpcHooks(NULL),
diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp
index 4e510b61e70..91a283b17f3 100644
--- a/slobrok/src/vespa/slobrok/server/sbenv.cpp
+++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp
@@ -86,7 +86,7 @@ ConfigTask::PerformTask()
SBEnv::SBEnv(const ConfigShim &shim)
: _transport(std::make_unique<FNET_Transport>()),
- _supervisor(std::make_unique<FRT_Supervisor>(_transport.get(), nullptr)),
+ _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())),
_configShim(shim),
_configurator(shim.factory().create(*this)),
_shuttingDown(false),
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
index ec488b25714..2f1cdc74ce1 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp
@@ -7,6 +7,7 @@
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/host_name.h>
#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/transport.h>
#include <sstream>
#include <vespa/log/log.h>
@@ -16,7 +17,9 @@ namespace storage {
FNetListener::FNetListener(MessageEnqueuer& messageEnqueuer, const config::ConfigUri & configUri, uint32_t port)
: _messageEnqueuer(messageEnqueuer),
- _orb(std::make_unique<FRT_Supervisor>()),
+ _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)),
+ _transport(std::make_unique<FNET_Transport>()),
+ _orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_closed(false),
_slobrokRegister(*_orb, configUri)
{
@@ -26,7 +29,7 @@ FNetListener::FNetListener(MessageEnqueuer& messageEnqueuer, const config::Confi
ost << "Failed to listen to RPC port " << port << ".";
throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC);
}
- _orb->Start();
+ _transport->Start(_threadPool.get());
}
FNetListener::~FNetListener()
@@ -57,7 +60,7 @@ FNetListener::close()
{
_closed = true;
_slobrokRegister.unregisterName(_handle);
- _orb->ShutDown(true);
+ _transport->ShutDown(true);
}
void
diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.h b/storage/src/vespa/storage/storageserver/fnetlistener.h
index 2097be15491..8d24311bc57 100644
--- a/storage/src/vespa/storage/storageserver/fnetlistener.h
+++ b/storage/src/vespa/storage/storageserver/fnetlistener.h
@@ -4,6 +4,8 @@
#include <vespa/slobrok/sbregister.h>
#include <atomic>
+class FNET_Transport;
+
namespace storage {
namespace api { class StorageMessage; }
@@ -33,11 +35,13 @@ public:
int getListenPort() const;
private:
- MessageEnqueuer& _messageEnqueuer;
- std::unique_ptr<FRT_Supervisor> _orb;
- std::atomic<bool> _closed;
- slobrok::api::RegisterAPI _slobrokRegister;
- vespalib::string _handle;
+ MessageEnqueuer& _messageEnqueuer;
+ std::unique_ptr<FastOS_ThreadPool> _threadPool;
+ std::unique_ptr<FNET_Transport> _transport;
+ std::unique_ptr<FRT_Supervisor> _orb;
+ std::atomic<bool> _closed;
+ slobrok::api::RegisterAPI _slobrokRegister;
+ vespalib::string _handle;
void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest *req);
};
diff --git a/storage/src/vespa/storage/tools/storage-cmd.cpp b/storage/src/vespa/storage/tools/storage-cmd.cpp
index 564574627a7..daaa890873f 100644
--- a/storage/src/vespa/storage/tools/storage-cmd.cpp
+++ b/storage/src/vespa/storage/tools/storage-cmd.cpp
@@ -18,22 +18,22 @@ private:
const char *value = param + 2;
switch (param[0]) {
case 'b':
- req->GetParams()->AddInt8(strtoll(value, NULL, 0));
+ req->GetParams()->AddInt8(strtoll(value, nullptr, 0));
break;
case 'h':
- req->GetParams()->AddInt16(strtoll(value, NULL, 0));
+ req->GetParams()->AddInt16(strtoll(value, nullptr, 0));
break;
case 'i':
- req->GetParams()->AddInt32(strtoll(value, NULL, 0));
+ req->GetParams()->AddInt32(strtoll(value, nullptr, 0));
break;
case 'l':
- req->GetParams()->AddInt64(strtoll(value, NULL, 0));
+ req->GetParams()->AddInt64(strtoll(value, nullptr, 0));
break;
case 'f':
- req->GetParams()->AddFloat(vespalib::locale::c::strtod(value, NULL));
+ req->GetParams()->AddFloat(vespalib::locale::c::strtod(value, nullptr));
break;
case 'd':
- req->GetParams()->AddDouble(vespalib::locale::c::strtod(value, NULL));
+ req->GetParams()->AddDouble(vespalib::locale::c::strtod(value, nullptr));
break;
case 's':
req->GetParams()->AddString(value);
@@ -55,11 +55,10 @@ public:
return 1;
}
int retCode = 0;
- FRT_Supervisor supervisor;
- supervisor.Start();
+ fnet::frt::StandaloneFRT supervisor;
slobrok::ConfiguratorFactory sbcfg("admin/slobrok.0");
- slobrok::api::MirrorAPI mirror(supervisor, sbcfg);
+ slobrok::api::MirrorAPI mirror(supervisor.supervisor(), sbcfg);
while (!mirror.ready()) {
FastOS_Thread::Sleep(10);
@@ -72,11 +71,11 @@ public:
}
for (size_t j = 0; j < list.size(); j++) {
- FRT_Target *target = supervisor.GetTarget(list[j].second.c_str());
+ FRT_Target *target = supervisor.supervisor().GetTarget(list[j].second.c_str());
// If not fleet controller, need to connect first.
- if (strstr(_argv[1], "fleetcontroller") == NULL) {
- FRT_RPCRequest *req = supervisor.AllocRPCRequest();
+ if (strstr(_argv[1], "fleetcontroller") == nullptr) {
+ FRT_RPCRequest *req = supervisor.supervisor().AllocRPCRequest();
req->SetMethodName("vespa.storage.connect");
req->GetParams()->AddString(_argv[1]);
target->InvokeSync(req, 10.0);
@@ -90,7 +89,7 @@ public:
req->SubRef();
}
- FRT_RPCRequest *req = supervisor.AllocRPCRequest();
+ FRT_RPCRequest *req = supervisor.supervisor().AllocRPCRequest();
req->SetMethodName(_argv[2]);
for (int i = 3; i < _argc; ++i) {
@@ -115,7 +114,6 @@ public:
req->SubRef();
target->SubRef();
}
- supervisor.ShutDown(true);
return retCode;
}
};
diff --git a/storageserver/src/tests/storageservertest.cpp b/storageserver/src/tests/storageservertest.cpp
index 2ef770b68b1..413acf89f27 100644
--- a/storageserver/src/tests/storageservertest.cpp
+++ b/storageserver/src/tests/storageservertest.cpp
@@ -33,51 +33,50 @@ using document::test::makeDocumentBucket;
namespace storage {
namespace {
- uint64_t getTimeInMillis() {
- struct timeval t;
- gettimeofday(&t, 0);
- return (t.tv_sec * uint64_t(1000)) + (t.tv_usec / uint64_t(1000));
- }
-
- class SlobrokMirror {
- config::ConfigUri config;
- FRT_Supervisor visor;
- std::unique_ptr<slobrok::api::MirrorAPI> mirror;
- public:
- SlobrokMirror(const config::ConfigUri & cfg) : config(cfg) {}
+uint64_t getTimeInMillis() {
+ struct timeval t;
+ gettimeofday(&t, 0);
+ return (t.tv_sec * uint64_t(1000)) + (t.tv_usec / uint64_t(1000));
+}
- void init(uint32_t timeoutms) {
- uint64_t timeout = getTimeInMillis() + timeoutms;
- visor.Start();
- mirror.reset(new slobrok::api::MirrorAPI(visor, config));
- while (!mirror->ready()) {
- if (getTimeInMillis() > timeout)
- throw vespalib::IllegalStateException(
- "Failed to initialize slobrok mirror within "
- "timeout.", VESPA_STRLOC);
- FastOS_Thread::Sleep(1);
- }
+class SlobrokMirror {
+ config::ConfigUri config;
+ fnet::frt::StandaloneFRT visor;
+ std::unique_ptr<slobrok::api::MirrorAPI> mirror;
+
+public:
+ SlobrokMirror(const config::ConfigUri &cfg) : config(cfg) {}
+
+ void init(uint32_t timeoutms) {
+ uint64_t timeout = getTimeInMillis() + timeoutms;
+ mirror.reset(new slobrok::api::MirrorAPI(visor.supervisor(), config));
+ while (!mirror->ready()) {
+ if (getTimeInMillis() > timeout)
+ throw vespalib::IllegalStateException(
+ "Failed to initialize slobrok mirror within "
+ "timeout.", VESPA_STRLOC);
+ FastOS_Thread::Sleep(1);
}
+ }
- slobrok::api::MirrorAPI& getMirror() {
- if (mirror.get() == 0) throw vespalib::IllegalStateException(
+ slobrok::api::MirrorAPI &getMirror() {
+ if (mirror.get() == 0)
+ throw vespalib::IllegalStateException(
"You need to call init() before you can fetch mirror");
- return *mirror;
- }
- FRT_Supervisor& getSupervisor() {
- if (mirror.get() == 0) throw vespalib::IllegalStateException(
+ return *mirror;
+ }
+
+ FRT_Supervisor &getSupervisor() {
+ if (mirror.get() == 0)
+ throw vespalib::IllegalStateException(
"You need to call init() before you can fetch supervisor");
- return visor;
- }
+ return visor.supervisor();
+ }
+
+ ~SlobrokMirror() = default;
+};
- ~SlobrokMirror() {
- if (mirror) {
- mirror.reset();
- visor.ShutDown(true);
- }
- }
- };
}
struct StorageServerTest : public CppUnit::TestFixture {
diff --git a/vespaclient/src/vespa/vespaclient/vdsstates/statesapp.cpp b/vespaclient/src/vespa/vespaclient/vdsstates/statesapp.cpp
index b24eb3e0373..9ec9249b671 100644
--- a/vespaclient/src/vespa/vespaclient/vdsstates/statesapp.cpp
+++ b/vespaclient/src/vespa/vespaclient/vdsstates/statesapp.cpp
@@ -247,17 +247,16 @@ struct StateApp : public FastOS_Application {
}
int run() {
- FRT_Supervisor supervisor;
- supervisor.Start();
+ fnet::frt::StandaloneFRT supervisor;
std::unique_ptr<slobrok::api::MirrorAPI> slobrok;
if (_options._slobrokConnectionSpec == "") {
config::ConfigUri config(_options._slobrokConfigId);
- slobrok.reset(new slobrok::api::MirrorAPI(supervisor, config));
+ slobrok.reset(new slobrok::api::MirrorAPI(supervisor.supervisor(), config));
} else {
std::vector<std::string> specList;
specList.push_back(_options._slobrokConnectionSpec);
- slobrok.reset(new slobrok::api::MirrorAPI(supervisor, specList));
+ slobrok.reset(new slobrok::api::MirrorAPI(supervisor.supervisor(), specList));
}
LOG(debug, "Waiting for slobrok data to be available.");
uint64_t startTime = getTimeInMillis();
@@ -287,7 +286,6 @@ struct StateApp : public FastOS_Application {
}
if (!slobrok->ready()) {
std::cerr << "Slobrok not ready.\n";
- supervisor.ShutDown(true);
return 1;
}
@@ -300,13 +298,12 @@ struct StateApp : public FastOS_Application {
if (specs.size() == 0) {
std::cerr << "No fleet controller could be found for '"
<< mask << ".\n";
- supervisor.ShutDown(true);
return 1;
}
std::sort(specs.begin(), specs.end(), Sorter());
LOG(debug, "Found fleet controller %s - %s\n",
specs.front().first.c_str(), specs.front().second.c_str());
- FRT_Target *target = supervisor.GetTarget(specs.front().second.c_str());
+ FRT_Target *target = supervisor.supervisor().GetTarget(specs.front().second.c_str());
if (!_options._nonfriendlyOutput && _options._mode == GETNODESTATE)
{
std::cerr <<
@@ -341,7 +338,7 @@ struct StateApp : public FastOS_Application {
indexes.push_back(_options._nodeIndex);
} else {
std::string hostname(vespa::Defaults::vespaHostname());
- FRT_RPCRequest* req = supervisor.AllocRPCRequest();
+ FRT_RPCRequest* req = supervisor.supervisor().AllocRPCRequest();
req->SetMethodName("getNodeList");
target->InvokeSync(req, 10.0);
std::string prefix = _options._cluster.getConfigId() + "/" + nodeType + "/";
@@ -380,7 +377,7 @@ struct StateApp : public FastOS_Application {
break;
}
for (uint32_t j=0; j<indexes.size(); ++j) {
- FRT_RPCRequest* req = supervisor.AllocRPCRequest();
+ FRT_RPCRequest* req = supervisor.supervisor().AllocRPCRequest();
if (_options._mode == GETNODESTATE) {
req->SetMethodName("getNodeState");
req->GetParams()->AddString(nodeType.c_str());
@@ -448,7 +445,6 @@ struct StateApp : public FastOS_Application {
}
}
target->SubRef();
- supervisor.ShutDown(true);
return (failed ? 1 : 0);
}
};
diff --git a/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp b/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp
index 5f2bbe8e11a..ae64db4d452 100644
--- a/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp
+++ b/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp
@@ -460,8 +460,7 @@ Application::printServices() const
void
Application::getServices(slobrok::api::IMirrorAPI::SpecList &ret, uint32_t depth) const
{
- FRT_Supervisor frt;
- frt.Start();
+ fnet::frt::StandaloneFRT frt;
std::string pattern = "*";
for (uint32_t i = 0; i < depth; ++i) {
@@ -469,14 +468,13 @@ Application::getServices(slobrok::api::IMirrorAPI::SpecList &ret, uint32_t depth
for (slobrok::api::IMirrorAPI::SpecList::iterator it = lst.begin();
it != lst.end(); ++it)
{
- if (isService(frt, it->second)) {
+ if (isService(frt.supervisor(), it->second)) {
ret.push_back(*it);
}
}
pattern.append("/*");
}
- frt.ShutDown(true);
}
bool