diff options
72 files changed, 565 insertions, 660 deletions
diff --git a/config/src/apps/vespa-configproxy-cmd/proxycmd.cpp b/config/src/apps/vespa-configproxy-cmd/proxycmd.cpp index 3407a880ec7..ce0665eae4a 100644 --- a/config/src/apps/vespa-configproxy-cmd/proxycmd.cpp +++ b/config/src/apps/vespa-configproxy-cmd/proxycmd.cpp @@ -16,41 +16,41 @@ Flags::Flags() targethost("localhost"), portnumber(19090) { } -Flags::~Flags() { } +Flags::~Flags() = default; ProxyCmd::ProxyCmd(const Flags& flags) - : _supervisor(NULL), - _target(NULL), - _req(NULL), + : _server(), + _supervisor(nullptr), + _target(nullptr), + _req(nullptr), _flags(flags) { } -ProxyCmd::~ProxyCmd() { } +ProxyCmd::~ProxyCmd() = default; void ProxyCmd::initRPC() { - _supervisor = new FRT_Supervisor(); + _server = std::make_unique<fnet::frt::StandaloneFRT>(); + _supervisor = &_server->supervisor(); _req = _supervisor->AllocRPCRequest(); - _supervisor->Start(); } void ProxyCmd::invokeRPC() { - if (_req == NULL) return; + if (_req == nullptr) return; _target->InvokeSync(_req, 65.0); } void ProxyCmd::finiRPC() { - if (_req != NULL) { + if (_req != nullptr) { _req->SubRef(); - _req = NULL; + _req = nullptr; } - if (_target != NULL) { + if (_target != nullptr) { _target->SubRef(); _target = NULL; } - if (_supervisor != NULL) { - _supervisor->ShutDown(true); - delete _supervisor; - _supervisor = NULL; + if (_server) { + _server.reset(); + _supervisor = nullptr; } } diff --git a/config/src/apps/vespa-configproxy-cmd/proxycmd.h b/config/src/apps/vespa-configproxy-cmd/proxycmd.h index df8f429a3c9..49be0fff885 100644 --- a/config/src/apps/vespa-configproxy-cmd/proxycmd.h +++ b/config/src/apps/vespa-configproxy-cmd/proxycmd.h @@ -9,6 +9,8 @@ class FRT_Target; class FRT_RPCRequest; class FRT_Values; +namespace fnet::frt { class StandaloneFRT; } + struct Flags { vespalib::string method; std::vector<vespalib::string> args; @@ -23,6 +25,7 @@ struct Flags { class ProxyCmd { private: + std::unique_ptr<fnet::frt::StandaloneFRT> _server; FRT_Supervisor *_supervisor; FRT_Target *_target; FRT_RPCRequest *_req; diff --git a/config/src/apps/vespa-get-config/getconfig.cpp b/config/src/apps/vespa-get-config/getconfig.cpp index 290beacf5a4..f23f28b2734 100644 --- a/config/src/apps/vespa-get-config/getconfig.cpp +++ b/config/src/apps/vespa-get-config/getconfig.cpp @@ -20,6 +20,7 @@ using namespace config; class GetConfig : public FastOS_Application { private: + std::unique_ptr<fnet::frt::StandaloneFRT> _server; FRT_Supervisor *_supervisor; FRT_Target *_target; @@ -27,7 +28,7 @@ private: GetConfig &operator=(const GetConfig &); public: - GetConfig() : _supervisor(NULL), _target(NULL) {} + GetConfig() : _server(), _supervisor(nullptr), _target(nullptr) {} virtual ~GetConfig(); int usage(); void initRPC(const char *spec); @@ -38,8 +39,8 @@ public: GetConfig::~GetConfig() { - LOG_ASSERT(_supervisor == NULL); - LOG_ASSERT(_target == NULL); + LOG_ASSERT(_supervisor == nullptr); + LOG_ASSERT(_target == nullptr); } @@ -71,23 +72,22 @@ GetConfig::usage() void GetConfig::initRPC(const char *spec) { - _supervisor = new FRT_Supervisor(); + _server = std::make_unique<fnet::frt::StandaloneFRT>(); + _supervisor = &_server->supervisor(); _target = _supervisor->GetTarget(spec); - _supervisor->Start(); } void GetConfig::finiRPC() { - if (_target != NULL) { + if (_target != nullptr) { _target->SubRef(); - _target = NULL; + _target = nullptr; } - if (_supervisor != NULL) { - _supervisor->ShutDown(true); - delete _supervisor; - _supervisor = NULL; + if (_server) { + _server.reset(); + _supervisor = nullptr; } } @@ -99,8 +99,8 @@ GetConfig::Main() char c = -1; std::vector<vespalib::string> defSchema; - const char *schema = NULL; - const char *defName = NULL; + const char *schema = nullptr; + const char *defName = nullptr; const char *defMD5 = ""; std::string defNamespace("config"); const char *serverHost = "localhost"; @@ -110,7 +110,7 @@ GetConfig::Main() const char *vespaVersionString = nullptr; int64_t generation = 0; - if (configId == NULL) { + if (configId == nullptr) { configId = ""; } const char *configMD5 = ""; @@ -119,7 +119,7 @@ GetConfig::Main() int serverPort = 19090; - const char *optArg = NULL; + const char *optArg = nullptr; int optInd = 0; while ((c = GetOpt("a:n:v:g:i:jlm:c:t:V:w:r:s:p:dh", optArg, optInd)) != -1) { int retval = 1; @@ -181,19 +181,19 @@ GetConfig::Main() } } - if (defName == NULL || serverPort == 0) { + if (defName == nullptr || serverPort == 0) { usage(); return 1; } - if (strchr(defName, '.') != NULL) { + if (strchr(defName, '.') != nullptr) { const char *tmp = defName; defName = strrchr(defName, '.'); defName++; defNamespace = std::string(tmp, defName - tmp - 1); } - if (schema != NULL) { + if (schema != nullptr) { std::ifstream is; is.open(schema); std::string item; diff --git a/config/src/apps/vespa-ping-configproxy/pingproxy.cpp b/config/src/apps/vespa-ping-configproxy/pingproxy.cpp index a88d7deb79a..d681473ce34 100644 --- a/config/src/apps/vespa-ping-configproxy/pingproxy.cpp +++ b/config/src/apps/vespa-ping-configproxy/pingproxy.cpp @@ -12,6 +12,7 @@ LOG_SETUP("vespa-ping-configproxy"); class PingProxy : public FastOS_Application { private: + std::unique_ptr<fnet::frt::StandaloneFRT> _server; FRT_Supervisor *_supervisor; FRT_Target *_target; @@ -19,7 +20,7 @@ private: PingProxy &operator=(const PingProxy &); public: - PingProxy() : _supervisor(NULL), _target(NULL) {} + PingProxy() : _server(), _supervisor(nullptr), _target(nullptr) {} virtual ~PingProxy(); int usage(); void initRPC(const char *spec); @@ -30,8 +31,8 @@ public: PingProxy::~PingProxy() { - LOG_ASSERT(_supervisor == NULL); - LOG_ASSERT(_target == NULL); + LOG_ASSERT(_supervisor == nullptr); + LOG_ASSERT(_target == nullptr); } @@ -48,23 +49,22 @@ PingProxy::usage() void PingProxy::initRPC(const char *spec) { - _supervisor = new FRT_Supervisor(); + _server = std::make_unique<fnet::frt::StandaloneFRT>(); + _supervisor = &_server->supervisor(); _target = _supervisor->GetTarget(spec); - _supervisor->Start(); } void PingProxy::finiRPC() { - if (_target != NULL) { + if (_target != nullptr) { _target->SubRef(); - _target = NULL; + _target = nullptr; } - if (_supervisor != NULL) { - _supervisor->ShutDown(true); - delete _supervisor; - _supervisor = NULL; + if (_server) { + _server.reset(); + _supervisor = nullptr; } } @@ -80,7 +80,7 @@ PingProxy::Main() int clientTimeout = 5; int serverPort = 19090; - const char *optArg = NULL; + const char *optArg = nullptr; int optInd = 0; while ((c = GetOpt("w:s:p:dh", optArg, optInd)) != -1) { switch (c) { diff --git a/config/src/tests/failover/failover.cpp b/config/src/tests/failover/failover.cpp index 990ca761e7e..7e7017ff057 100644 --- a/config/src/tests/failover/failover.cpp +++ b/config/src/tests/failover/failover.cpp @@ -34,7 +34,7 @@ struct RPCServer : public FRT_Invokable { vespalib::Barrier barrier; int64_t gen; - RPCServer() : supervisor(NULL), barrier(2), gen(1) { } + RPCServer() : supervisor(nullptr), barrier(2), gen(1) { } void init(FRT_Supervisor * s) { FRT_ReflectionBuilder rb(s); @@ -81,18 +81,20 @@ struct RPCServer : public FRT_Invokable { void verifyConfig(std::unique_ptr<MyConfig> config) { - ASSERT_TRUE(config.get() != NULL); + ASSERT_TRUE(config); ASSERT_EQUAL("myval", config->myField); } struct ServerFixture { using UP = std::unique_ptr<ServerFixture>; + std::unique_ptr<fnet::frt::StandaloneFRT> frtServer; FRT_Supervisor * supervisor; RPCServer server; Barrier b; const vespalib::string listenSpec; ServerFixture(const vespalib::string & ls) - : supervisor(NULL), + : frtServer(), + supervisor(nullptr), server(), b(2), listenSpec(ls) @@ -106,22 +108,21 @@ struct ServerFixture { void start() { - supervisor = new FRT_Supervisor(); + frtServer = std::make_unique<fnet::frt::StandaloneFRT>(); + supervisor = & frtServer->supervisor(); server.init(supervisor); supervisor->Listen(get_port(listenSpec)); wait(); // Wait until test runner signals we can start - supervisor->Main(); wait(); // Signalling that we have shut down wait(); // Wait for signal saying that supervisor is deleted } void stop() { - if (supervisor != NULL) { - supervisor->ShutDown(true); + if (frtServer) { wait(); // Wait for supervisor to shut down - delete supervisor; - supervisor = NULL; + frtServer.reset(); + supervisor = nullptr; wait(); // Signal that we are done and start can return. } } diff --git a/config/src/tests/file_acquirer/file_acquirer_test.cpp b/config/src/tests/file_acquirer/file_acquirer_test.cpp index 0453c6ddbd0..33bb8f47e09 100644 --- a/config/src/tests/file_acquirer/file_acquirer_test.cpp +++ b/config/src/tests/file_acquirer/file_acquirer_test.cpp @@ -7,8 +7,10 @@ using namespace config; struct ServerFixture : FRT_Invokable { - FRT_Supervisor orb; + fnet::frt::StandaloneFRT server; + FRT_Supervisor &orb; vespalib::string spec; + void init_rpc() { FRT_ReflectionBuilder rb(&orb); rb.DefineMethod("waitFor", "s", "s", FRT_METHOD(ServerFixture::RPC_waitFor), this); @@ -16,24 +18,24 @@ struct ServerFixture : FRT_Invokable { rb.ParamDesc("file_ref", "file reference to wait for and resolve"); rb.ReturnDesc("file_path", "actual path to the requested file"); } - ServerFixture() : orb() { + + ServerFixture() : server(), orb(server.supervisor()) { init_rpc(); orb.Listen(0); spec = vespalib::make_string("tcp/localhost:%d", orb.GetListenPort()); - orb.Start(); } + void RPC_waitFor(FRT_RPCRequest *req) { FRT_Values ¶ms = *req->GetParams(); - FRT_Values &ret = *req->GetReturn(); + FRT_Values &ret = *req->GetReturn(); if (strcmp(params[0]._string._str, "my_ref") == 0) { ret.AddString("my_path"); } else { req->SetError(FRTE_RPC_METHOD_FAILED, "invalid file reference"); } } - ~ServerFixture() { - orb.ShutDown(true); - } + + ~ServerFixture() = default; }; TEST_FF("require that files can be acquired over rpc", ServerFixture(), RpcFileAcquirer(f1.spec)) { diff --git a/config/src/tests/frt/frt.cpp b/config/src/tests/frt/frt.cpp index c225dd9dcf3..f489ca4c7d9 100644 --- a/config/src/tests/frt/frt.cpp +++ b/config/src/tests/frt/frt.cpp @@ -119,17 +119,18 @@ namespace { int errorCode; int timeout; FRT_RPCRequest * ans; - FRT_Supervisor supervisor; + fnet::frt::StandaloneFRT server; + FRT_Supervisor & supervisor; FNET_Scheduler scheduler; vespalib::string address; - ConnectionMock(FRT_RPCRequest * answer = NULL); + ConnectionMock(FRT_RPCRequest * answer = nullptr); ~ConnectionMock(); FRT_RPCRequest * allocRPCRequest() override { return supervisor.AllocRPCRequest(); } void setError(int ec) override { errorCode = ec; } void invoke(FRT_RPCRequest * req, double t, FRT_IRequestWait * waiter) override { timeout = static_cast<int>(t); - if (ans != NULL) + if (ans != nullptr) waiter->RequestDone(ans); else waiter->RequestDone(req); @@ -142,10 +143,11 @@ namespace { : errorCode(0), timeout(0), ans(answer), - supervisor(), + server(), + supervisor(server.supervisor()), address() { } - ConnectionMock::~ConnectionMock() { } + ConnectionMock::~ConnectionMock() = default; struct FactoryMock : public ConnectionFactory { ConnectionMock * current; @@ -284,7 +286,7 @@ TEST("require that v3 request is correctly initialized") { ConfigDefinition origDef(MyConfig::CONFIG_DEF_SCHEMA); FRT_RPCRequest * req = v3req.getRequest(); - ASSERT_TRUE(req != NULL); + ASSERT_TRUE(req != nullptr); FRT_Values & params(*req->GetParams()); std::string json(params[0]._string._str); Slime slime; diff --git a/config/src/vespa/config/file_acquirer/file_acquirer.cpp b/config/src/vespa/config/file_acquirer/file_acquirer.cpp index 140d63aa7e6..a61f480b33f 100644 --- a/config/src/vespa/config/file_acquirer/file_acquirer.cpp +++ b/config/src/vespa/config/file_acquirer/file_acquirer.cpp @@ -4,6 +4,8 @@ #include <vespa/fnet/frt/supervisor.h> #include <vespa/fnet/frt/target.h> #include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/fnet/transport.h> +#include <vespa/fastos/thread.h> #include <vespa/log/log.h> LOG_SETUP(".config.file_acquirer"); @@ -11,10 +13,12 @@ LOG_SETUP(".config.file_acquirer"); namespace config { RpcFileAcquirer::RpcFileAcquirer(const vespalib::string &spec) - : _orb(std::make_unique<FRT_Supervisor>()), + : _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)), + _transport(std::make_unique<FNET_Transport>()), + _orb(std::make_unique<FRT_Supervisor>(_transport.get())), _spec(spec) { - _orb->Start(); + _transport->Start(_threadPool.get()); } vespalib::string @@ -39,7 +43,7 @@ RpcFileAcquirer::wait_for(const vespalib::string &file_ref, double timeout_s) RpcFileAcquirer::~RpcFileAcquirer() { - _orb->ShutDown(true); + _transport->ShutDown(true); } } // namespace config diff --git a/config/src/vespa/config/file_acquirer/file_acquirer.h b/config/src/vespa/config/file_acquirer/file_acquirer.h index 77b0cf9a821..844e277cbf9 100644 --- a/config/src/vespa/config/file_acquirer/file_acquirer.h +++ b/config/src/vespa/config/file_acquirer/file_acquirer.h @@ -4,6 +4,8 @@ #include <vespa/vespalib/stllike/string.h> class FRT_Supervisor; +class FNET_Transport; +class FastOS_ThreadPool; namespace config { @@ -23,12 +25,14 @@ struct FileAcquirer { class RpcFileAcquirer : public FileAcquirer { private: + std::unique_ptr<FastOS_ThreadPool> _threadPool; + std::unique_ptr<FNET_Transport> _transport; std::unique_ptr<FRT_Supervisor> _orb; vespalib::string _spec; public: RpcFileAcquirer(const vespalib::string &spec); vespalib::string wait_for(const vespalib::string &file_ref, double timeout_s) override; - ~RpcFileAcquirer(); + ~RpcFileAcquirer() override; }; } // namespace config diff --git a/config/src/vespa/config/frt/frtconnectionpool.cpp b/config/src/vespa/config/frt/frtconnectionpool.cpp index 4bea8062a03..b7440ceb7f0 100644 --- a/config/src/vespa/config/frt/frtconnectionpool.cpp +++ b/config/src/vespa/config/frt/frtconnectionpool.cpp @@ -4,6 +4,7 @@ #include <vespa/vespalib/util/host_name.h> #include <vespa/fnet/frt/supervisor.h> #include <vespa/fnet/transport.h> +#include <vespa/fastos/thread.h> namespace config { @@ -26,7 +27,9 @@ FRTConnectionPool::FRTConnectionKey::operator==(const FRTConnectionKey& right) c } FRTConnectionPool::FRTConnectionPool(const ServerSpec & spec, const TimingValues & timingValues) - : _supervisor(std::make_unique<FRT_Supervisor>()), + : _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)), + _transport(std::make_unique<FNET_Transport>()), + _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())), _selectIdx(0), _hostname("") { @@ -35,12 +38,12 @@ FRTConnectionPool::FRTConnectionPool(const ServerSpec & spec, const TimingValues _connections[key] = std::make_shared<FRTConnection>(spec.getHost(i), *_supervisor, timingValues); } setHostname(); - _supervisor->Start(); + _transport->Start(_threadPool.get()); } FRTConnectionPool::~FRTConnectionPool() { - _supervisor->ShutDown(true); + _transport->ShutDown(true); } void diff --git a/config/src/vespa/config/frt/frtconnectionpool.h b/config/src/vespa/config/frt/frtconnectionpool.h index e671c756db5..0b30e723272 100644 --- a/config/src/vespa/config/frt/frtconnectionpool.h +++ b/config/src/vespa/config/frt/frtconnectionpool.h @@ -7,6 +7,9 @@ #include <vector> #include <map> +class FNET_Transport; +class FastOS_ThreadPool; + namespace config { class FRTConnectionPool : public ConnectionFactory { @@ -29,6 +32,8 @@ private: int operator==(const FRTConnectionKey& right) const; }; + std::unique_ptr<FastOS_ThreadPool> _threadPool; + std::unique_ptr<FNET_Transport> _transport; std::unique_ptr<FRT_Supervisor> _supervisor; int _selectIdx; vespalib::string _hostname; diff --git a/configd/src/apps/cmd/main.cpp b/configd/src/apps/cmd/main.cpp index 6228de86a80..e448fff1c6d 100644 --- a/configd/src/apps/cmd/main.cpp +++ b/configd/src/apps/cmd/main.cpp @@ -14,7 +14,8 @@ LOG_SETUP("vespa-sentinel-cmd"); class Cmd { private: - std::unique_ptr<FRT_Supervisor> _supervisor; + std::unique_ptr<fnet::frt::StandaloneFRT> _server; + FRT_Supervisor *_supervisor; FRT_Target *_target; public: @@ -44,9 +45,9 @@ void usage() void Cmd::initRPC(const char *spec) { - _supervisor = std::make_unique<FRT_Supervisor>(); + _server = std::make_unique<fnet::frt::StandaloneFRT>(); + _supervisor = & _server->supervisor(); _target = _supervisor->GetTarget(spec); - _supervisor->Start(); } @@ -57,9 +58,9 @@ Cmd::finiRPC() _target->SubRef(); _target = nullptr; } - if (_supervisor) { - _supervisor->ShutDown(true); - _supervisor.reset(); + if (_server) { + _server.reset(); + _supervisor = nullptr; } } diff --git a/configd/src/apps/sentinel/rpcserver.cpp b/configd/src/apps/sentinel/rpcserver.cpp index b719d012304..a49cba50e4d 100644 --- a/configd/src/apps/sentinel/rpcserver.cpp +++ b/configd/src/apps/sentinel/rpcserver.cpp @@ -8,22 +8,18 @@ LOG_SETUP(".rpcserver"); namespace config::sentinel { RpcServer::RpcServer(int portNumber, CommandQueue &cmdQ) - : _supervisor(), + : _server(), _rpcHooks(cmdQ), _port(portNumber) { - _rpcHooks.initRPC(&_supervisor); - if (_supervisor.Listen(portNumber)) { + _rpcHooks.initRPC(&_server.supervisor()); + if (_server.supervisor().Listen(portNumber)) { LOG(config, "listening on port %d", portNumber); - _supervisor.Start(); } else { LOG(error, "unable to listen to port %d", portNumber); } } -RpcServer::~RpcServer() -{ - _supervisor.ShutDown(true); -} +RpcServer::~RpcServer() = default; } // namespace config::sentinel diff --git a/configd/src/apps/sentinel/rpcserver.h b/configd/src/apps/sentinel/rpcserver.h index f295975f224..ef4b394fdca 100644 --- a/configd/src/apps/sentinel/rpcserver.h +++ b/configd/src/apps/sentinel/rpcserver.h @@ -13,7 +13,7 @@ namespace config::sentinel { class RpcServer { private: - FRT_Supervisor _supervisor; + fnet::frt::StandaloneFRT _server; RPCHooks _rpcHooks; int _port; diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp index e00164b5a1a..757f3a976ed 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp @@ -20,7 +20,9 @@ namespace documentapi { ExternPolicy::ExternPolicy(const string ¶m) : _lock(), - _orb(std::make_unique<FRT_Supervisor>()), + _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)), + _transport(std::make_unique<FNET_Transport>()), + _orb(std::make_unique<FRT_Supervisor>(_transport.get())), _mirror(), _pattern(), _session(), @@ -56,7 +58,7 @@ ExternPolicy::ExternPolicy(const string ¶m) : slobrok::ConfiguratorFactory config(spec); _mirror.reset(new MirrorAPI(*_orb, config)); - _started = _orb->Start(); + _started = _transport->Start(_threadPool.get()); if (!_started) { _error = "Failed to start FNET supervisor."; return; @@ -81,7 +83,7 @@ ExternPolicy::~ExternPolicy() { _mirror.reset(); if (_started) { - _orb->ShutDown(true); + _transport->ShutDown(true); } } @@ -133,10 +135,9 @@ ExternPolicy::update() IMirrorAPI::SpecList entries = _mirror->lookup(_pattern); if (!entries.empty()) { - for (IMirrorAPI::SpecList::iterator it = entries.begin(); - it != entries.end(); ++it) + for (const auto & spec : entries) { - _recipients.push_back(mbus::Hop::parse(it->second + _session)); + _recipients.push_back(mbus::Hop::parse(spec.second + _session)); } } } diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.h index 61182071a54..c56f8f214ec 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.h +++ b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.h @@ -9,6 +9,8 @@ #include <vespa/documentapi/common.h> class FRT_Supervisor; +class FNET_Transport; +class FastOS_ThreadPool; namespace documentapi { @@ -19,9 +21,11 @@ namespace documentapi { class ExternPolicy : public mbus::IRoutingPolicy { private: using IMirrorAPI = slobrok::api::IMirrorAPI; - vespalib::Lock _lock; - std::unique_ptr<FRT_Supervisor> _orb; - std::unique_ptr<IMirrorAPI> _mirror; + vespalib::Lock _lock; + std::unique_ptr<FastOS_ThreadPool> _threadPool; + std::unique_ptr<FNET_Transport> _transport; + std::unique_ptr<FRT_Supervisor> _orb; + std::unique_ptr<IMirrorAPI> _mirror; string _pattern; string _session; string _error; diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp index 5bbfa75012d..18dd525b066 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp @@ -5,6 +5,8 @@ #include <vespa/messagebus/routing/routingcontext.h> #include <vespa/fnet/frt/frt.h> #include <vespa/slobrok/sbmirror.h> +#include <vespa/fnet/transport.h> +#include <vespa/fastos/thread.h> using slobrok::api::IMirrorAPI; using slobrok::api::MirrorAPI; @@ -14,7 +16,9 @@ namespace documentapi { ExternSlobrokPolicy::ExternSlobrokPolicy(const std::map<string, string>& param) : AsyncInitializationPolicy(param), _firstTry(true), - _orb(std::make_unique<FRT_Supervisor>()), + _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)), + _transport(std::make_unique<FNET_Transport>()), + _orb(std::make_unique<FRT_Supervisor>(_transport.get())), _slobrokConfigId("admin/slobrok.0") { if (param.find("config") != param.end()) { @@ -42,10 +46,10 @@ ExternSlobrokPolicy::ExternSlobrokPolicy(const std::map<string, string>& param) ExternSlobrokPolicy::~ExternSlobrokPolicy() { - bool started = _mirror.get() != NULL; + bool started = (bool)_mirror; _mirror.reset(); if (started) { - _orb->ShutDown(true); + _transport->ShutDown(true); } } @@ -56,13 +60,12 @@ string ExternSlobrokPolicy::init() { } else if (_configSources.size() != 0) { slobrok::ConfiguratorFactory config( config::ConfigUri(_slobrokConfigId, - config::IConfigContext::SP( - new config::ConfigContext(config::ServerSpec(_configSources))))); + std::make_unique<config::ConfigContext>(config::ServerSpec(_configSources)))); _mirror.reset(new MirrorAPI(*_orb, config)); } if (_mirror.get()) { - _orb->Start(); + _transport->Start(_threadPool.get()); } return ""; diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.h index dc0534095b6..d2966f852d5 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.h +++ b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.h @@ -9,6 +9,8 @@ #include <vespa/config/subscription/sourcespec.h> class FRT_Supervisor; +class FNET_Transport; +class FastOS_ThreadPool; namespace documentapi { @@ -22,6 +24,8 @@ protected: bool _firstTry; config::ServerSpec::HostSpecList _configSources; vespalib::Lock _lock; + std::unique_ptr<FastOS_ThreadPool> _threadPool; + std::unique_ptr<FNET_Transport> _transport; std::unique_ptr<FRT_Supervisor> _orb; std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; std::vector<std::string> _slobroks; diff --git a/fnet/src/examples/frt/rpc/echo_client.cpp b/fnet/src/examples/frt/rpc/echo_client.cpp index 9d73d38cd1e..cc406224135 100644 --- a/fnet/src/examples/frt/rpc/echo_client.cpp +++ b/fnet/src/examples/frt/rpc/echo_client.cpp @@ -19,9 +19,9 @@ EchoClient::Main() printf("usage : echo_client <connectspec>\n"); return 1; } - FRT_Supervisor supervisor; + fnet::frt::StandaloneFRT server; + FRT_Supervisor & supervisor = server.supervisor(); - supervisor.Start(); FRT_Target *target = supervisor.GetTarget(_argv[1]); FRT_RPCRequest *req = supervisor.AllocRPCRequest(); FRT_Values *args = req->GetParams(); @@ -84,7 +84,6 @@ EchoClient::Main() printf("Return values != parameters.\n"); } req->SubRef(); - supervisor.ShutDown(true); return 0; } diff --git a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp index 7c6434e870a..726a500cc55 100644 --- a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp +++ b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp @@ -47,9 +47,9 @@ MyApp::Main() } bool ok = true; RPC rpc; - FRT_Supervisor orb; + fnet::frt::StandaloneFRT server; + FRT_Supervisor & orb = server.supervisor(); rpc.Init(&orb); - orb.Start(); FRT_Target *target = orb.Get2WayTarget(_argv[1]); FRT_RPCRequest *req = orb.AllocRPCRequest(); @@ -102,7 +102,6 @@ MyApp::Main() req->SubRef(); target->SubRef(); - orb.ShutDown(true); return ok ? 0 : 1; } diff --git a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp index ac7b34ebda0..7e0caaba00d 100644 --- a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp +++ b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp @@ -60,11 +60,12 @@ MyApp::Main() return 1; } RPC rpc; - FRT_Supervisor orb; - rpc.Init(&orb); - orb.Listen(_argv[1]); - FNET_SignalShutDown ssd(*orb.GetTransport()); - orb.Main(); + fnet::frt::StandaloneFRT server; + FRT_Supervisor & supervisor = server.supervisor(); + rpc.Init(&supervisor); + supervisor.Listen(_argv[1]); + FNET_SignalShutDown ssd(*supervisor.GetTransport()); + server.wait_finished(); return 0; } diff --git a/fnet/src/examples/frt/rpc/rpc_client.cpp b/fnet/src/examples/frt/rpc/rpc_client.cpp index cc230d2bc7c..fc1d54d3440 100644 --- a/fnet/src/examples/frt/rpc/rpc_client.cpp +++ b/fnet/src/examples/frt/rpc/rpc_client.cpp @@ -19,9 +19,9 @@ RPCClient::Main() printf("usage : rpc_client <connectspec>\n"); return 1; } - FRT_Supervisor supervisor; + fnet::frt::StandaloneFRT server; + FRT_Supervisor & supervisor = server.supervisor(); - supervisor.Start(); FRT_Target *target = supervisor.GetTarget(_argv[1]); const char *str1 = "abc"; @@ -80,7 +80,6 @@ RPCClient::Main() req->SubRef(); target->SubRef(); - supervisor.ShutDown(true); return 0; } diff --git a/fnet/src/examples/frt/rpc/rpc_info.cpp b/fnet/src/examples/frt/rpc/rpc_info.cpp index 5f17e69a10d..d90d22d1986 100644 --- a/fnet/src/examples/frt/rpc/rpc_info.cpp +++ b/fnet/src/examples/frt/rpc/rpc_info.cpp @@ -79,11 +79,12 @@ RPCInfo::Main() } bool verbose = (_argc > 2 && strcmp(_argv[2], "verbose") == 0); - FRT_Supervisor supervisor; + fnet::frt::StandaloneFRT server; + FRT_Supervisor & supervisor = server.supervisor(); + FRT_Target *target = supervisor.GetTarget(_argv[1]); FRT_RPCRequest *m_list = nullptr; FRT_RPCRequest *info = nullptr; - supervisor.Start(); GetReq(&info, &supervisor); info->SetMethodName("frt.rpc.ping"); @@ -91,7 +92,6 @@ RPCInfo::Main() if (info->IsError()) { fprintf(stderr, "Error talking to %s\n", _argv[1]); FreeReqs(m_list, info); - supervisor.ShutDown(true); return 1; } @@ -129,7 +129,6 @@ RPCInfo::Main() } FreeReqs(m_list, info); target->SubRef(); - supervisor.ShutDown(true); return 0; } diff --git a/fnet/src/examples/frt/rpc/rpc_invoke.cpp b/fnet/src/examples/frt/rpc/rpc_invoke.cpp index ea34ffa6905..fb82622a537 100644 --- a/fnet/src/examples/frt/rpc/rpc_invoke.cpp +++ b/fnet/src/examples/frt/rpc/rpc_invoke.cpp @@ -71,8 +71,8 @@ int RPCClient::run() { int retCode = 0; - FRT_Supervisor supervisor; - supervisor.Start(); + fnet::frt::StandaloneFRT server; + FRT_Supervisor & supervisor = server.supervisor(); int targetArg = 1; int methNameArg = 2; int startOfArgs = 3; @@ -109,7 +109,6 @@ RPCClient::run() } req->SubRef(); target->SubRef(); - supervisor.ShutDown(true); return retCode; } diff --git a/fnet/src/examples/frt/rpc/rpc_proxy.cpp b/fnet/src/examples/frt/rpc/rpc_proxy.cpp index 496190b2b80..fa152dcec10 100644 --- a/fnet/src/examples/frt/rpc/rpc_proxy.cpp +++ b/fnet/src/examples/frt/rpc/rpc_proxy.cpp @@ -224,18 +224,18 @@ App::Main() } bool verbose = (_argc > 3) && (strcmp(_argv[3], "verbose") == 0); - FRT_Supervisor supervisor; + fnet::frt::StandaloneFRT server; + FRT_Supervisor & supervisor = server.supervisor(); RPCProxy proxy(supervisor, _argv[2], verbose); supervisor.GetReflectionManager()->Reset(); supervisor.SetSessionInitHook(FRT_METHOD(RPCProxy::HOOK_Init), &proxy); supervisor.SetSessionDownHook(FRT_METHOD(RPCProxy::HOOK_Down), &proxy); supervisor.SetSessionFiniHook(FRT_METHOD(RPCProxy::HOOK_Fini), &proxy); - supervisor.SetMethodMismatchHook(FRT_METHOD(RPCProxy::HOOK_Mismatch), - &proxy); + supervisor.SetMethodMismatchHook(FRT_METHOD(RPCProxy::HOOK_Mismatch), &proxy); supervisor.Listen(_argv[1]); FNET_SignalShutDown ssd(*supervisor.GetTransport()); - supervisor.Main(); + server.wait_finished(); return 0; } diff --git a/fnet/src/examples/frt/rpc/rpc_server.cpp b/fnet/src/examples/frt/rpc/rpc_server.cpp index 03d618133c9..cc3972166ad 100644 --- a/fnet/src/examples/frt/rpc/rpc_server.cpp +++ b/fnet/src/examples/frt/rpc/rpc_server.cpp @@ -91,12 +91,12 @@ RPCServer::Main(int argc, char **argv) return 1; } - _supervisor = new FRT_Supervisor(); + fnet::frt::StandaloneFRT server; + _supervisor = &server.supervisor(); InitRPC(_supervisor); _supervisor->Listen(argv[1]); FNET_SignalShutDown ssd(*_supervisor->GetTransport()); - _supervisor->Main(); - delete _supervisor; + server.wait_finished(); return 0; } diff --git a/fnet/src/tests/frt/method_pt/method_pt.cpp b/fnet/src/tests/frt/method_pt/method_pt.cpp index 5417fddceeb..608a435bd1d 100644 --- a/fnet/src/tests/frt/method_pt/method_pt.cpp +++ b/fnet/src/tests/frt/method_pt/method_pt.cpp @@ -18,6 +18,7 @@ class ComplexHandler3; Test *_test; +std::unique_ptr<fnet::frt::StandaloneFRT> _server; FRT_Supervisor *_supervisor; FRT_Target *_target; SimpleHandler *_simpleHandler; @@ -176,7 +177,8 @@ public: //------------------------------------------------------------- void initTest() { - _supervisor = new FRT_Supervisor(); + _server = std::make_unique<fnet::frt::StandaloneFRT>(); + _supervisor = &_server->supervisor(); _simpleHandler = new SimpleHandler(); _mediumHandler1 = new MediumHandler1(); _mediumHandler2 = new MediumHandler2(); @@ -200,9 +202,6 @@ void initTest() { _target = _supervisor->GetTarget(spec.c_str()); ASSERT_TRUE(_target != nullptr); - bool startOK = _supervisor->Start(); - ASSERT_TRUE(startOK); - FRT_ReflectionBuilder rb(_supervisor); //------------------------------------------------------------------- @@ -247,7 +246,6 @@ void initTest() { void finiTest() { - _supervisor->ShutDown(true); delete _complexHandler1; delete _complexHandler2; delete _complexHandler3; @@ -256,7 +254,7 @@ void finiTest() { delete _mediumHandler3; delete _simpleHandler; _target->SubRef(); - delete _supervisor; + _server.reset(); } diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp index 2441ea6eaa0..ed4911175a0 100644 --- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp +++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp @@ -15,7 +15,7 @@ struct Rpc : FRT_Invokable { FNET_Transport transport; FRT_Supervisor orb; Rpc(CryptoEngine::SP crypto, size_t num_threads) - : thread_pool(128 * 1024), transport(crypto, num_threads), orb(&transport, &thread_pool) {} + : thread_pool(128 * 1024), transport(crypto, num_threads), orb(&transport) {} void start() { ASSERT_TRUE(transport.Start(&thread_pool)); } diff --git a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp index 0e5f4712e61..cdb2636a8c1 100644 --- a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp +++ b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp @@ -21,18 +21,19 @@ CryptoEngine::SP tls_crypto = std::make_shared<vespalib::TlsCryptoEngine>(vespal TT_Tag req_tag("request"); struct Fixture : FRT_Invokable { - FRT_Supervisor orb; - Fixture(CryptoEngine::SP crypto) : orb(std::move(crypto)) { - ASSERT_TRUE(orb.Listen(0)); + fnet::frt::StandaloneFRT server; + FRT_Supervisor & orb; + Fixture(CryptoEngine::SP crypto) + : server(std::move(crypto)), + orb(server.supervisor()) + { + ASSERT_TRUE(orb.Listen(0)); init_rpc(); - ASSERT_TRUE(orb.Start()); } FRT_Target *connect() { return orb.GetTarget(orb.GetListenPort()); } - ~Fixture() { - orb.ShutDown(true); - } + ~Fixture() = default; void init_rpc() { FRT_ReflectionBuilder rb(&orb); rb.DefineMethod("inc", "l", "l", FRT_METHOD(Fixture::rpc_inc), this); diff --git a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp index ab21c62bb68..43a61cd9bcd 100644 --- a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp +++ b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp @@ -39,13 +39,13 @@ struct Server : public FRT_Invokable TEST("detach return invoke") { Receptor receptor; - FRT_Supervisor orb; - Server server(orb, receptor); - ASSERT_TRUE(orb.Listen(0)); - ASSERT_TRUE(orb.Start()); - std::string spec = vespalib::make_string("tcp/localhost:%d", orb.GetListenPort()); - FRT_Target *target = orb.Get2WayTarget(spec.c_str()); - FRT_RPCRequest *req = orb.AllocRPCRequest(); + fnet::frt::StandaloneFRT frtServer; + FRT_Supervisor & supervisor = frtServer.supervisor(); + Server server(supervisor, receptor); + ASSERT_TRUE(supervisor.Listen(0)); + std::string spec = vespalib::make_string("tcp/localhost:%d", supervisor.GetListenPort()); + FRT_Target *target = supervisor.Get2WayTarget(spec.c_str()); + FRT_RPCRequest *req = supervisor.AllocRPCRequest(); req->SetMethodName("hook"); target->InvokeSync(req, 5.0); @@ -58,7 +58,6 @@ TEST("detach return invoke") { } req->SubRef(); target->SubRef(); - orb.ShutDown(true); if (receptor.req != 0) { EXPECT_TRUE(!receptor.req->IsError()); receptor.req->SubRef(); diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp index a84e3049704..410a60fa08a 100644 --- a/fnet/src/tests/frt/rpc/invoke.cpp +++ b/fnet/src/tests/frt/rpc/invoke.cpp @@ -249,8 +249,8 @@ public: class Fixture { private: - FRT_Supervisor _client; - FRT_Supervisor _server; + fnet::frt::StandaloneFRT _client; + fnet::frt::StandaloneFRT _server; vespalib::string _peerSpec; FRT_Target *_target; TestRPC _testRPC; @@ -258,7 +258,7 @@ private: public: FRT_Target &target() { return *_target; } - FRT_Target *make_bad_target() { return _client.GetTarget("bogus address"); } + FRT_Target *make_bad_target() { return _client.supervisor().GetTarget("bogus address"); } RequestLatch &detached_req() { return _testRPC.detached_req(); } EchoTest &echo() { return _echoTest; } @@ -267,16 +267,14 @@ public: _server(crypto), _peerSpec(), _target(nullptr), - _testRPC(&_server), - _echoTest(&_server) + _testRPC(&_server.supervisor()), + _echoTest(&_server.supervisor()) { - _client.GetTransport()->SetTCPNoDelay(true); - _server.GetTransport()->SetTCPNoDelay(true); - ASSERT_TRUE(_server.Listen("tcp/0")); - ASSERT_TRUE(_server.Start()); - ASSERT_TRUE(_client.Start()); - _peerSpec = SocketSpec::from_host_port("localhost", _server.GetListenPort()).spec(); - _target = _client.GetTarget(_peerSpec.c_str()); + _client.supervisor().GetTransport()->SetTCPNoDelay(true); + _server.supervisor().GetTransport()->SetTCPNoDelay(true); + ASSERT_TRUE(_server.supervisor().Listen("tcp/0")); + _peerSpec = SocketSpec::from_host_port("localhost", _server.supervisor().GetListenPort()).spec(); + _target = _client.supervisor().GetTarget(_peerSpec.c_str()); //--------------------------------------------------------------------- MyReq req("frt.rpc.ping"); target().InvokeSync(req.borrow(), timeout); @@ -284,8 +282,6 @@ public: } ~Fixture() { - _client.ShutDown(true); - _server.ShutDown(true); _target->SubRef(); } }; diff --git a/fnet/src/tests/frt/rpc/session.cpp b/fnet/src/tests/frt/rpc/session.cpp index 96d20ae9c18..24cbedb3ff7 100644 --- a/fnet/src/tests/frt/rpc/session.cpp +++ b/fnet/src/tests/frt/rpc/session.cpp @@ -86,14 +86,13 @@ struct RPC : public FRT_Invokable } }; -TEST("session") { - RPC rpc; - FRT_Supervisor orb(crypto); +void testSession(RPC & rpc) { + fnet::frt::StandaloneFRT frt(crypto); + FRT_Supervisor & orb = frt.supervisor(); char spec[64]; rpc.Init(&orb); ASSERT_TRUE(orb.Listen("tcp/0")); sprintf(spec, "tcp/localhost:%d", orb.GetListenPort()); - ASSERT_TRUE(orb.Start()); FRT_Target *target = orb.GetTarget(spec); FRT_RPCRequest *req = orb.AllocRPCRequest(); @@ -122,7 +121,10 @@ TEST("session") { req->SubRef(); target->SubRef(); - orb.ShutDown(true); +} +TEST("session") { + RPC rpc; + testSession(rpc); EXPECT_TRUE(Session::GetCnt() == 0); EXPECT_TRUE(!rpc.bogusFini); }; diff --git a/fnet/src/tests/frt/rpc/sharedblob.cpp b/fnet/src/tests/frt/rpc/sharedblob.cpp index a48ecbb1da7..1c0503454c7 100644 --- a/fnet/src/tests/frt/rpc/sharedblob.cpp +++ b/fnet/src/tests/frt/rpc/sharedblob.cpp @@ -118,7 +118,8 @@ struct ServerSampler : public FRT_Invokable }; TEST("testExplicitShared") { - FRT_Supervisor orb; + fnet::frt::StandaloneFRT frt; + FRT_Supervisor & orb = frt.supervisor(); MyBlob blob; FRT_RPCRequest *req = orb.AllocRPCRequest(); @@ -171,7 +172,8 @@ TEST("testExplicitShared") { TEST("testImplicitShared") { DataSet dataSet; - FRT_Supervisor orb; + fnet::frt::StandaloneFRT frt; + FRT_Supervisor & orb = frt.supervisor(); FRT_RPCRequest *req = orb.AllocRPCRequest(); ServerSampler serverSampler(dataSet, req); { @@ -182,7 +184,6 @@ TEST("testImplicitShared") { orb.Listen(0); int port = orb.GetListenPort(); ASSERT_TRUE(port != 0); - orb.Start(); char tmp[64]; snprintf(tmp, sizeof(tmp), "tcp/localhost:%d", port); @@ -255,7 +256,6 @@ TEST("testImplicitShared") { } req->SubRef(); target->SubRef(); - orb.ShutDown(true); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp index f76e66c2af6..b91b9fa4b39 100644 --- a/fnet/src/tests/info/info.cpp +++ b/fnet/src/tests/info/info.cpp @@ -37,12 +37,12 @@ struct RPC : public FRT_Invokable TEST("info") { RPC rpc; - FRT_Supervisor orb; + fnet::frt::StandaloneFRT server; + FRT_Supervisor & orb = server.supervisor(); char spec[64]; rpc.Init(&orb); ASSERT_TRUE(orb.Listen("tcp/0")); sprintf(spec, "tcp/localhost:%d", orb.GetListenPort()); - ASSERT_TRUE(orb.Start()); FRT_Target *target = orb.GetTarget(spec); FRT_RPCRequest *local_info = orb.AllocRPCRequest(); @@ -65,7 +65,6 @@ TEST("info") { target->SubRef(); local_info->SubRef(); remote_info->SubRef(); - orb.ShutDown(true); }; TEST("size of important objects") diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp index e509223c005..d5c7e0847f7 100644 --- a/fnet/src/vespa/fnet/frt/supervisor.cpp +++ b/fnet/src/vespa/fnet/frt/supervisor.cpp @@ -9,57 +9,25 @@ #include <vespa/fnet/connector.h> #include <vespa/fastos/thread.h> -FRT_Supervisor::FRT_Supervisor(FNET_Transport *transport, - FastOS_ThreadPool *threadPool) +FRT_Supervisor::FRT_Supervisor(FNET_Transport *transport) : _transport(transport), - _threadPool(threadPool), - _standAlone(false), _packetFactory(), _packetStreamer(&_packetFactory), _connector(nullptr), _reflectionManager(), _rpcHooks(&_reflectionManager), _connHooks(*this), - _methodMismatchHook(nullptr) + _methodMismatchHook() { _rpcHooks.InitRPC(this); } -FRT_Supervisor::FRT_Supervisor(vespalib::CryptoEngine::SP crypto, - uint32_t threadStackSize, - uint32_t maxThreads) - : _transport(nullptr), - _threadPool(nullptr), - _standAlone(true), - _packetFactory(), - _packetStreamer(&_packetFactory), - _connector(nullptr), - _reflectionManager(), - _rpcHooks(&_reflectionManager), - _connHooks(*this), - _methodMismatchHook(nullptr) -{ - _transport = new FNET_Transport(std::move(crypto), 1); - assert(_transport != nullptr); - if (threadStackSize > 0) { - _threadPool = new FastOS_ThreadPool(threadStackSize, maxThreads); - assert(_threadPool != nullptr); - } - _rpcHooks.InitRPC(this); -} - - FRT_Supervisor::~FRT_Supervisor() { - if (_standAlone) { - delete _transport; - delete _threadPool; - } if (_connector != nullptr) { _connector->SubRef(); } - delete _methodMismatchHook; } FNET_Scheduler * @@ -134,43 +102,35 @@ FRT_Supervisor::AllocRPCRequest(FRT_RPCRequest *tradein) void -FRT_Supervisor::SetSessionInitHook(FRT_METHOD_PT method, - FRT_Invokable *handler) +FRT_Supervisor::SetSessionInitHook(FRT_METHOD_PT method, FRT_Invokable *handler) { _connHooks.SetSessionInitHook(method, handler); } void -FRT_Supervisor::SetSessionDownHook(FRT_METHOD_PT method, - FRT_Invokable *handler) +FRT_Supervisor::SetSessionDownHook(FRT_METHOD_PT method, FRT_Invokable *handler) { _connHooks.SetSessionDownHook(method, handler); } void -FRT_Supervisor::SetSessionFiniHook(FRT_METHOD_PT method, - FRT_Invokable *handler) +FRT_Supervisor::SetSessionFiniHook(FRT_METHOD_PT method, FRT_Invokable *handler) { _connHooks.SetSessionFiniHook(method, handler); } void -FRT_Supervisor::SetMethodMismatchHook(FRT_METHOD_PT method, - FRT_Invokable *handler) +FRT_Supervisor::SetMethodMismatchHook(FRT_METHOD_PT method, FRT_Invokable *handler) { - delete _methodMismatchHook; - _methodMismatchHook = new FRT_Method("frt.hook.methodMismatch", "*", "*", - method, handler); - assert(_methodMismatchHook != nullptr); + _methodMismatchHook = std::make_unique<FRT_Method>("frt.hook.methodMismatch", "*", "*", method, handler); } void -FRT_Supervisor::InvokeVoid(FNET_Connection *conn, - FRT_RPCRequest *req) +FRT_Supervisor::InvokeVoid(FNET_Connection *conn, FRT_RPCRequest *req) { if (conn != nullptr) { FNET_Channel *ch = conn->OpenChannel(); @@ -183,11 +143,7 @@ FRT_Supervisor::InvokeVoid(FNET_Connection *conn, void -FRT_Supervisor::InvokeAsync(SchedulerPtr scheduler, - FNET_Connection *conn, - FRT_RPCRequest *req, - double timeout, - FRT_IRequestWait *waiter) +FRT_Supervisor::InvokeAsync(SchedulerPtr scheduler, FNET_Connection *conn, FRT_RPCRequest *req, double timeout, FRT_IRequestWait *waiter) { uint32_t chid; FNET_Packet *packet = req->CreateRequestPacket(true); @@ -209,10 +165,7 @@ FRT_Supervisor::InvokeAsync(SchedulerPtr scheduler, void -FRT_Supervisor::InvokeSync(SchedulerPtr scheduler, - FNET_Connection *conn, - FRT_RPCRequest *req, - double timeout) +FRT_Supervisor::InvokeSync(SchedulerPtr scheduler, FNET_Connection *conn, FRT_RPCRequest *req, double timeout) { FRT_SingleReqWait waiter; InvokeAsync(scheduler, conn, req, timeout, &waiter); @@ -250,7 +203,7 @@ FNET_IPacketHandler::HP_RetCode FRT_Supervisor::HandlePacket(FNET_Packet *packet, FNET_Context context) { uint32_t pcode = packet->GetPCODE() & 0xffff; // remove flags - FRT_RPCRequest *req = (FRT_RPCRequest *) context._value.VOIDP; + auto *req = (FRT_RPCRequest *) context._value.VOIDP; FRT_RPCInvoker *invoker = nullptr; bool noReply = false; @@ -265,9 +218,9 @@ FRT_Supervisor::HandlePacket(FNET_Packet *packet, FNET_Context context) if (req->IsError()) { if (req->GetErrorCode() != FRTE_RPC_BAD_REQUEST - && _methodMismatchHook != nullptr) + && _methodMismatchHook) { - invoker->ForceMethod(_methodMismatchHook); + invoker->ForceMethod(_methodMismatchHook.get()); return (invoker->Invoke()) ? FNET_FREE_CHANNEL : FNET_CLOSE_CHANNEL; } @@ -282,40 +235,6 @@ FRT_Supervisor::HandlePacket(FNET_Packet *packet, FNET_Context context) } } - -bool -FRT_Supervisor::Start() -{ - assert(_standAlone); - if (_threadPool == nullptr) - return false; - return _transport->Start(_threadPool); -} - - -void -FRT_Supervisor::Main() -{ - assert(_standAlone); - _transport->Main(); -} - - -void -FRT_Supervisor::ShutDown(bool waitFinished) -{ - assert(_standAlone); - _transport->ShutDown(waitFinished); -} - - -void -FRT_Supervisor::WaitFinished() -{ - assert(_standAlone); - _transport->WaitFinished(); -} - //---------------------------------------------------- // RPC Hooks //---------------------------------------------------- @@ -403,57 +322,38 @@ FRT_Supervisor::RPCHooks::RPC_GetMethodInfo(FRT_RPCRequest *req) FRT_Supervisor::ConnHooks::ConnHooks(FRT_Supervisor &parent) : _parent(parent), - _sessionInitHook(nullptr), - _sessionDownHook(nullptr), - _sessionFiniHook(nullptr) + _sessionInitHook(), + _sessionDownHook(), + _sessionFiniHook() { } -FRT_Supervisor::ConnHooks::~ConnHooks() -{ - delete _sessionInitHook; - delete _sessionDownHook; - delete _sessionFiniHook; -} - +FRT_Supervisor::ConnHooks::~ConnHooks() = default; void -FRT_Supervisor::ConnHooks::SetSessionInitHook(FRT_METHOD_PT method, - FRT_Invokable *handler) +FRT_Supervisor::ConnHooks::SetSessionInitHook(FRT_METHOD_PT method, FRT_Invokable *handler) { - delete _sessionInitHook; - _sessionInitHook = new FRT_Method("frt.hook.sessionInit", "", "", - method, handler); - assert(_sessionInitHook != nullptr); + _sessionInitHook = std::make_unique<FRT_Method>("frt.hook.sessionInit", "", "", method, handler); } void -FRT_Supervisor::ConnHooks::SetSessionDownHook(FRT_METHOD_PT method, - FRT_Invokable *handler) +FRT_Supervisor::ConnHooks::SetSessionDownHook(FRT_METHOD_PT method, FRT_Invokable *handler) { - delete _sessionDownHook; - _sessionDownHook = new FRT_Method("frt.hook.sessionDown", "", "", - method, handler); - assert(_sessionDownHook != nullptr); + _sessionDownHook = std::make_unique<FRT_Method>("frt.hook.sessionDown", "", "", method, handler); } void -FRT_Supervisor::ConnHooks::SetSessionFiniHook(FRT_METHOD_PT method, - FRT_Invokable *handler) +FRT_Supervisor::ConnHooks::SetSessionFiniHook(FRT_METHOD_PT method, FRT_Invokable *handler) { - delete _sessionFiniHook; - _sessionFiniHook = new FRT_Method("frt.hook.sessionFini", "", "", - method, handler); - assert(_sessionFiniHook != nullptr); + _sessionFiniHook = std::make_unique<FRT_Method>("frt.hook.sessionFini", "", "", method, handler); } void -FRT_Supervisor::ConnHooks::InvokeHook(FRT_Method *hook, - FNET_Connection *conn) +FRT_Supervisor::ConnHooks::InvokeHook(FRT_Method *hook, FNET_Connection *conn) { FRT_RPCRequest *req = _parent.AllocRPCRequest(); req->SetMethodName(hook->GetName()); @@ -466,8 +366,8 @@ FRT_Supervisor::ConnHooks::InitAdminChannel(FNET_Channel *channel) { FNET_Connection *conn = channel->GetConnection(); conn->SetCleanupHandler(this); - if (_sessionInitHook != nullptr) { - InvokeHook(_sessionInitHook, conn); + if (_sessionInitHook) { + InvokeHook(_sessionInitHook.get(), conn); } channel->SetHandler(this); channel->SetContext(channel); @@ -476,16 +376,15 @@ FRT_Supervisor::ConnHooks::InitAdminChannel(FNET_Channel *channel) FNET_IPacketHandler::HP_RetCode -FRT_Supervisor::ConnHooks::HandlePacket(FNET_Packet *packet, - FNET_Context context) +FRT_Supervisor::ConnHooks::HandlePacket(FNET_Packet *packet, FNET_Context context) { if (!packet->IsChannelLostCMD()) { packet->Free(); return FNET_KEEP_CHANNEL; } FNET_Channel *ch = context._value.CHANNEL; - if (_sessionDownHook != nullptr) { - InvokeHook(_sessionDownHook, ch->GetConnection()); + if (_sessionDownHook) { + InvokeHook(_sessionDownHook.get(), ch->GetConnection()); } return FNET_FREE_CHANNEL; } @@ -494,8 +393,8 @@ FRT_Supervisor::ConnHooks::HandlePacket(FNET_Packet *packet, void FRT_Supervisor::ConnHooks::Cleanup(FNET_Connection *conn) { - if (_sessionFiniHook != nullptr) { - InvokeHook(_sessionFiniHook, conn); + if (_sessionFiniHook) { + InvokeHook(_sessionFiniHook.get(), conn); } } @@ -506,3 +405,33 @@ FRT_Supervisor::SchedulerPtr::SchedulerPtr(FNET_Transport *transport) FRT_Supervisor::SchedulerPtr::SchedulerPtr(FNET_TransportThread *transport_thread) : ptr(transport_thread->GetScheduler()) { } + +namespace fnet::frt { + +StandaloneFRT::StandaloneFRT() + : _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)), + _transport(std::make_unique<FNET_Transport>()), + _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())) +{ + _transport->Start(_threadPool.get()); +} + +StandaloneFRT::StandaloneFRT(vespalib::CryptoEngine::SP crypto) + : _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)), + _transport(std::make_unique<FNET_Transport>(std::move(crypto), 1)), + _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())) +{ + _transport->Start(_threadPool.get()); +} + +StandaloneFRT::~StandaloneFRT() +{ + _transport->ShutDown(true); +} + +void +StandaloneFRT::wait_finished() const { + _transport->WaitFinished(); +} + +} diff --git a/fnet/src/vespa/fnet/frt/supervisor.h b/fnet/src/vespa/fnet/frt/supervisor.h index dc7fb496239..1247cb08402 100644 --- a/fnet/src/vespa/fnet/frt/supervisor.h +++ b/fnet/src/vespa/fnet/frt/supervisor.h @@ -9,7 +9,6 @@ #include <vespa/fnet/ipackethandler.h> #include <vespa/fnet/connection.h> #include <vespa/fnet/simplepacketstreamer.h> -#include <vespa/vespalib/net/crypto_engine.h> class FNET_Transport; class FRT_Target; @@ -18,6 +17,9 @@ class FNET_Scheduler; class FRT_RPCInvoker; class FRT_IRequestWait; +namespace vespalib { class CryptoEngine; } + + class FRT_Supervisor : public FNET_IServerAdapter, public FNET_IPacketHandler { @@ -26,12 +28,10 @@ public: { private: FRT_ReflectionManager *_reflectionManager; - - RPCHooks(const RPCHooks &); - RPCHooks &operator=(const RPCHooks &); - public: - RPCHooks(FRT_ReflectionManager *reflect) + RPCHooks(const RPCHooks &) = delete; + RPCHooks &operator=(const RPCHooks &) = delete; + explicit RPCHooks(FRT_ReflectionManager *reflect) : _reflectionManager(reflect) {} void InitRPC(FRT_Supervisor *supervisor); @@ -45,17 +45,15 @@ public: public FNET_IPacketHandler { private: - FRT_Supervisor &_parent; - FRT_Method *_sessionInitHook; - FRT_Method *_sessionDownHook; - FRT_Method *_sessionFiniHook; - - ConnHooks(const ConnHooks &); - ConnHooks &operator=(const ConnHooks &); - + FRT_Supervisor &_parent; + std::unique_ptr<FRT_Method> _sessionInitHook; + std::unique_ptr<FRT_Method> _sessionDownHook; + std::unique_ptr<FRT_Method> _sessionFiniHook; public: - ConnHooks(FRT_Supervisor &parent); - ~ConnHooks(); + ConnHooks(const ConnHooks &) = delete; + ConnHooks &operator=(const ConnHooks &) = delete; + explicit ConnHooks(FRT_Supervisor &parent); + ~ConnHooks() override; void SetSessionInitHook(FRT_METHOD_PT method, FRT_Invokable *handler); void SetSessionDownHook(FRT_METHOD_PT method, FRT_Invokable *handler); @@ -67,32 +65,23 @@ public: }; private: - FNET_Transport *_transport; - FastOS_ThreadPool *_threadPool; - bool _standAlone; - - FRT_PacketFactory _packetFactory; - FNET_SimplePacketStreamer _packetStreamer; - FNET_Connector *_connector; - FRT_ReflectionManager _reflectionManager; - RPCHooks _rpcHooks; - ConnHooks _connHooks; - FRT_Method *_methodMismatchHook; - - FRT_Supervisor(const FRT_Supervisor &); - FRT_Supervisor &operator=(const FRT_Supervisor &); + FNET_Transport *_transport; + FRT_PacketFactory _packetFactory; + FNET_SimplePacketStreamer _packetStreamer; + FNET_Connector *_connector; + FRT_ReflectionManager _reflectionManager; + RPCHooks _rpcHooks; + ConnHooks _connHooks; + std::unique_ptr<FRT_Method> _methodMismatchHook; public: - FRT_Supervisor(FNET_Transport *transport, FastOS_ThreadPool *threadPool); - FRT_Supervisor(vespalib::CryptoEngine::SP crypto, uint32_t threadStackSize = 65000, uint32_t maxThreads = 0); - FRT_Supervisor(uint32_t threadStackSize = 65000, uint32_t maxThreads = 0) - : FRT_Supervisor(vespalib::CryptoEngine::get_default(), threadStackSize, maxThreads) {} - virtual ~FRT_Supervisor(); + explicit FRT_Supervisor(FNET_Transport *transport); + FRT_Supervisor(const FRT_Supervisor &) = delete; + FRT_Supervisor &operator=(const FRT_Supervisor &) = delete; + ~FRT_Supervisor() override; - bool StandAlone() { return _standAlone; } FNET_Transport *GetTransport() { return _transport; } FNET_Scheduler *GetScheduler(); - FastOS_ThreadPool *GetThreadPool() { return _threadPool; } FRT_ReflectionManager *GetReflectionManager() { return &_reflectionManager; } bool Listen(const char *spec); @@ -100,8 +89,7 @@ public: uint32_t GetListenPort() const; FRT_Target *GetTarget(const char *spec); - FRT_Target *Get2WayTarget(const char *spec, - FNET_Context connContext = FNET_Context()); + FRT_Target *Get2WayTarget(const char *spec, FNET_Context connContext = FNET_Context()); FRT_Target *GetTarget(int port); FRT_RPCRequest *AllocRPCRequest(FRT_RPCRequest *tradein = nullptr); @@ -120,17 +108,10 @@ public: }; // methods for performing rpc invocations - static void InvokeVoid(FNET_Connection *conn, - FRT_RPCRequest *req); - static void InvokeAsync(SchedulerPtr scheduler, - FNET_Connection *conn, - FRT_RPCRequest *req, - double timeout, - FRT_IRequestWait *waiter); - static void InvokeSync(SchedulerPtr scheduler, - FNET_Connection *conn, - FRT_RPCRequest *req, - double timeout); + static void InvokeVoid(FNET_Connection *conn, FRT_RPCRequest *req); + static void InvokeSync(SchedulerPtr scheduler, FNET_Connection *conn, FRT_RPCRequest *req, double timeout); + static void InvokeAsync(SchedulerPtr scheduler, FNET_Connection *conn, FRT_RPCRequest *req, double timeout, FRT_IRequestWait *waiter); + // FNET ServerAdapter Interface bool InitAdminChannel(FNET_Channel *channel) override; @@ -138,11 +119,26 @@ public: // Packet Handling HP_RetCode HandlePacket(FNET_Packet *packet, FNET_Context context) override; +}; - // Methods for controlling transport object in standalone mode - bool Start(); - void Main(); - void ShutDown(bool waitFinished); - void WaitFinished(); +namespace fnet::frt { + +/** + * This is a simple class that makes it easy to test RPC. + * Normally you do not want use it in production code as it hides your possibilites and responsibilities. + */ +class StandaloneFRT { +public: + StandaloneFRT(); + explicit StandaloneFRT(std::shared_ptr<vespalib::CryptoEngine> crypto); + ~StandaloneFRT(); + FRT_Supervisor & supervisor() { return *_supervisor; } + // TODO Remove this method as it is a relic from the ancient non-threaded world. + void wait_finished() const; +private: + std::unique_ptr<FastOS_ThreadPool> _threadPool; + std::unique_ptr<FNET_Transport> _transport; + std::unique_ptr<FRT_Supervisor> _supervisor; }; +} diff --git a/jrt_test/src/jrt-test/simpleserver/simpleserver.cpp b/jrt_test/src/jrt-test/simpleserver/simpleserver.cpp index 89d8cd881a8..b45d7986842 100644 --- a/jrt_test/src/jrt-test/simpleserver/simpleserver.cpp +++ b/jrt_test/src/jrt-test/simpleserver/simpleserver.cpp @@ -38,8 +38,7 @@ public: void rpc_inc(FRT_RPCRequest *req) { - req->GetReturn()->AddInt32(req->GetParams()->GetValue(0)._intval32 - + 1); + req->GetReturn()->AddInt32(req->GetParams()->GetValue(0)._intval32 + 1); } void rpc_blob(FRT_RPCRequest *req) @@ -79,10 +78,10 @@ App::Main() printf("usage: %s <listenspec>\n", _argv[0]); return 1; } - FRT_Supervisor orb; - Server server(&orb); - orb.Listen(_argv[1]); - orb.Main(); + fnet::frt::StandaloneFRT frtServer; + Server server(&frtServer.supervisor()); + frtServer.supervisor().Listen(_argv[1]); + frtServer.wait_finished(); return 0; } diff --git a/jrt_test/src/tests/echo/echo-client.cpp b/jrt_test/src/tests/echo/echo-client.cpp index 18e5892ef71..c1ffe29e1ce 100644 --- a/jrt_test/src/tests/echo/echo-client.cpp +++ b/jrt_test/src/tests/echo/echo-client.cpp @@ -12,9 +12,9 @@ public: printf("usage : echo_client <connectspec>\n"); return 1; } - FRT_Supervisor supervisor; + fnet::frt::StandaloneFRT server; + FRT_Supervisor & supervisor = server.supervisor(); - supervisor.Start(); FRT_Target *target = supervisor.GetTarget(_argv[1]); FRT_RPCRequest *req = supervisor.AllocRPCRequest(); FRT_Values *args = req->GetParams(); @@ -77,7 +77,6 @@ public: printf("Return values != parameters.\n"); } req->SubRef(); - supervisor.ShutDown(true); return 0; } }; diff --git a/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp b/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp index a465316343d..40c54980c11 100644 --- a/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp +++ b/jrt_test/src/tests/mandatory-methods/extract-reflection.cpp @@ -9,16 +9,16 @@ public: void GetReq(FRT_RPCRequest **req, FRT_Supervisor *supervisor) { - if ((*req) != NULL) + if ((*req) != nullptr) (*req)->SubRef(); (*req) = supervisor->AllocRPCRequest(); } void FreeReqs(FRT_RPCRequest *r1, FRT_RPCRequest *r2) { - if (r1 != NULL) + if (r1 != nullptr) r1->SubRef(); - if (r2 != NULL) + if (r2 != nullptr) r2->SubRef(); } @@ -72,11 +72,11 @@ public: } bool verbose = (_argc > 2 && strcmp(_argv[2], "verbose") == 0); - FRT_Supervisor supervisor; + fnet::frt::StandaloneFRT server; + FRT_Supervisor & supervisor = server.supervisor(); FRT_Target *target = supervisor.GetTarget(_argv[1]); - FRT_RPCRequest *m_list = NULL; - FRT_RPCRequest *info = NULL; - supervisor.Start(); + FRT_RPCRequest *m_list = nullptr; + FRT_RPCRequest *info = nullptr; for (int i = 0; i < 50; i++) { GetReq(&info, &supervisor); @@ -93,7 +93,6 @@ public: fprintf(stderr, "Error talking to %s\n", _argv[1]); info->Print(); FreeReqs(m_list, info); - supervisor.ShutDown(true); return 1; } @@ -131,7 +130,6 @@ public: } FreeReqs(m_list, info); target->SubRef(); - supervisor.ShutDown(true); return 0; } }; diff --git a/jrt_test/src/tests/mockup-invoke/mockup-server.cpp b/jrt_test/src/tests/mockup-invoke/mockup-server.cpp index 8456bee1e41..4e1d57e2ce6 100644 --- a/jrt_test/src/tests/mockup-invoke/mockup-server.cpp +++ b/jrt_test/src/tests/mockup-invoke/mockup-server.cpp @@ -51,10 +51,10 @@ App::Main() printf("usage: %s <listenspec>\n", _argv[0]); return 1; } - FRT_Supervisor orb; - MockupServer server(&orb); - orb.Listen(_argv[1]); - orb.Main(); + fnet::frt::StandaloneFRT orb; + MockupServer server(&orb.supervisor()); + orb.supervisor().Listen(_argv[1]); + orb.wait_finished(); return 0; } diff --git a/jrt_test/src/tests/rpc-error/test-errors.cpp b/jrt_test/src/tests/rpc-error/test-errors.cpp index 22e1007a517..c30af8ea579 100644 --- a/jrt_test/src/tests/rpc-error/test-errors.cpp +++ b/jrt_test/src/tests/rpc-error/test-errors.cpp @@ -5,22 +5,20 @@ class TestErrors : public vespalib::TestApp { private: + fnet::frt::StandaloneFRT server; FRT_Supervisor *client; FRT_Target *target; public: void init(const char *spec) { - client = new FRT_Supervisor; + client = & server.supervisor(); target = client->GetTarget(spec); - client->Start(); } void fini() { - target->SubRef(); - target = NULL; - client->ShutDown(true); - delete client; - client = NULL; + target->SubRef(); + target = nullptr; + client = nullptr; } void testNoError(); diff --git a/logd/src/logd/config_subscriber.cpp b/logd/src/logd/config_subscriber.cpp index ddcfcd38aae..7981e13a9a1 100644 --- a/logd/src/logd/config_subscriber.cpp +++ b/logd/src/logd/config_subscriber.cpp @@ -100,12 +100,11 @@ ConfigSubscriber::ConfigSubscriber(const config::ConfigUri& configUri) _handle(), _has_available(false), _need_new_forwarder(true), - _supervisor() + _server() { _handle = _subscriber.subscribe<LogdConfig>(configUri.getConfigId()); _subscriber.nextConfig(0); configure(_handle->getConfig()); - _supervisor.Start(); LOG(debug, "got logServer %s", _logserver_host.c_str()); LOG(debug, "got handle %p", _handle.get()); @@ -113,7 +112,6 @@ ConfigSubscriber::ConfigSubscriber(const config::ConfigUri& configUri) ConfigSubscriber::~ConfigSubscriber() { - _supervisor.ShutDown(true); LOG(debug, "forget logServer %s", _logserver_host.c_str()); LOG(debug, "done ~ConfSub()"); } @@ -123,7 +121,7 @@ ConfigSubscriber::make_forwarder(Metrics& metrics) { std::unique_ptr<Forwarder> result; if (_use_logserver) { - result = std::make_unique<RpcForwarder>(metrics, _forward_filter, _supervisor, _logserver_host, + result = std::make_unique<RpcForwarder>(metrics, _forward_filter, _server.supervisor(), _logserver_host, _logserver_rpc_port, 60.0, 100); } else { result = std::make_unique<EmptyForwarder>(metrics); diff --git a/logd/src/logd/config_subscriber.h b/logd/src/logd/config_subscriber.h index 4d0938dfa3b..a60dac4a367 100644 --- a/logd/src/logd/config_subscriber.h +++ b/logd/src/logd/config_subscriber.h @@ -28,8 +28,7 @@ private: config::ConfigHandle<cloud::config::log::LogdConfig>::UP _handle; bool _has_available; bool _need_new_forwarder; - FRT_Supervisor _supervisor; - + fnet::frt::StandaloneFRT _server; public: bool checkAvailable(); void latch(); diff --git a/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp index 5b70434e793..92a641f639e 100644 --- a/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp +++ b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp @@ -31,7 +31,7 @@ decode_log_request(FRT_Values& src, ProtoConverter::ProtoLogRequest& dst) std::string garbage("garbage"); struct RpcServer : public FRT_Invokable { - FRT_Supervisor supervisor; + fnet::frt::StandaloneFRT server; int request_count; std::vector<std::string> messages; bool reply_with_error; @@ -39,23 +39,20 @@ struct RpcServer : public FRT_Invokable { public: RpcServer() - : supervisor(), + : server(), request_count(0), messages(), reply_with_error(false), reply_with_proto_response(true) { - supervisor.Listen(0); - FRT_ReflectionBuilder builder(&supervisor); + server.supervisor().Listen(0); + FRT_ReflectionBuilder builder(&server.supervisor()); builder.DefineMethod("vespa.logserver.archiveLogMessages", "bix", "bix", FRT_METHOD(RpcServer::rpc_archive_log_messages), this); - supervisor.Start(); - } - ~RpcServer() { - supervisor.ShutDown(true); } + ~RpcServer() = default; int get_listen_port() { - return supervisor.GetListenPort(); + return server.supervisor().GetListenPort(); } void rpc_archive_log_messages(FRT_RPCRequest* request) { ProtoConverter::ProtoLogRequest proto_request; @@ -96,17 +93,14 @@ struct MockMetricsManager : public DummyMetricsManager { class ClientSupervisor { private: - FRT_Supervisor _supervisor; + fnet::frt::StandaloneFRT _client; public: ClientSupervisor() - : _supervisor() + : _client() { - _supervisor.Start(); - } - ~ClientSupervisor() { - _supervisor.ShutDown(true); } - FRT_Supervisor& get() { return _supervisor; } + ~ClientSupervisor() = default; + FRT_Supervisor& get() { return _client.supervisor(); } }; diff --git a/messagebus/src/tests/shutdown/shutdown.cpp b/messagebus/src/tests/shutdown/shutdown.cpp index 46a8a6518f0..1706da3b55f 100644 --- a/messagebus/src/tests/shutdown/shutdown.cpp +++ b/messagebus/src/tests/shutdown/shutdown.cpp @@ -37,21 +37,18 @@ TEST_APPHOOK(Test); void Test::requireThatListenFailedIsExceptionSafe() { - FRT_Supervisor orb; - ASSERT_TRUE(orb.Listen(0)); - ASSERT_TRUE(orb.Start()); + fnet::frt::StandaloneFRT orb; + ASSERT_TRUE(orb.supervisor().Listen(0)); Slobrok slobrok; try { TestServer bar(MessageBusParams(), RPCNetworkParams(slobrok.config()) - .setListenPort(orb.GetListenPort())); + .setListenPort(orb.supervisor().GetListenPort())); EXPECT_TRUE(false); } catch (vespalib::Exception &e) { - EXPECT_EQUAL("Failed to start network.", - e.getMessage()); + EXPECT_EQUAL("Failed to start network.", e.getMessage()); } - orb.ShutDown(true); } void diff --git a/messagebus/src/tests/targetpool/targetpool.cpp b/messagebus/src/tests/targetpool/targetpool.cpp index 0ae70daf489..0e0e566f2be 100644 --- a/messagebus/src/tests/targetpool/targetpool.cpp +++ b/messagebus/src/tests/targetpool/targetpool.cpp @@ -38,8 +38,8 @@ Test::Main() TestServer srv3(Identity("srv3"), RoutingSpec(), slobrok); RPCServiceAddress adr3("", srv3.mb.getConnectionSpec()); - FRT_Supervisor orb(1024u, 1); - ASSERT_TRUE(orb.Start()); + fnet::frt::StandaloneFRT server; + FRT_Supervisor & orb = server.supervisor(); std::unique_ptr<PoolTimer> ptr(new PoolTimer()); PoolTimer &timer = *ptr; RPCTargetPool pool(std::move(ptr), 0.666); @@ -92,7 +92,5 @@ Test::Main() pool.flushTargets(false); EXPECT_EQUAL(0u, pool.size()); - orb.ShutDown(true); - TEST_DONE(); } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 358c9ebdeac..be25a73ece2 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -112,7 +112,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _ident(params.getIdentity()), _threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)), _transport(std::make_unique<FNET_Transport>()), - _orb(std::make_unique<FRT_Supervisor>(_transport.get(), nullptr)), + _orb(std::make_unique<FRT_Supervisor>(_transport.get())), _scheduler(*_transport->GetScheduler()), _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())), _targetPoolTask(_scheduler, *_targetPool), diff --git a/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp b/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp index 416a25cfb7b..2e097e7141f 100644 --- a/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp +++ b/searchcore/src/apps/vespa-proton-cmd/vespa-proton-cmd.cpp @@ -20,19 +20,19 @@ private: App(const App &); App& operator=(const App &); - FRT_Supervisor *_supervisor; + std::unique_ptr<fnet::frt::StandaloneFRT> _frt; FRT_Target *_target; FRT_RPCRequest *_req; public: - App() : _supervisor(NULL), - _target(NULL), - _req(NULL) {} + App() : _frt(), + _target(nullptr), + _req(nullptr) {} virtual ~App() { - assert(_supervisor == NULL); - assert(_target == NULL); - assert(_req == NULL); + assert(!_frt); + assert(_target == nullptr); + assert(_req == nullptr); } int usage() @@ -49,14 +49,13 @@ public: void initRPC() { - _supervisor = new FRT_Supervisor(); - _req = _supervisor->AllocRPCRequest(); - _supervisor->Start(); + _frt = std::make_unique<fnet::frt::StandaloneFRT>(); + _req = _frt->supervisor().AllocRPCRequest(); } void invokeRPC(bool print, double timeout=5.0) { - if (_req == NULL) + if (_req == nullptr) return; _target->InvokeSync(_req, timeout); @@ -66,18 +65,16 @@ public: void finiRPC() { - if (_req != NULL) { + if (_req != nullptr) { _req->SubRef(); - _req = NULL; + _req = nullptr; } - if (_target != NULL) { + if (_target != nullptr) { _target->SubRef(); - _target = NULL; + _target = nullptr; } - if (_supervisor != NULL) { - _supervisor->ShutDown(true); - delete _supervisor; - _supervisor = NULL; + if (_frt) { + _frt.reset(); } } @@ -115,7 +112,7 @@ public: try { slobrok::ConfiguratorFactory sbcfg("admin/slobrok.0"); - slobrok::api::MirrorAPI sbmirror(*_supervisor, sbcfg); + slobrok::api::MirrorAPI sbmirror(_frt->supervisor(), sbcfg); for (int timeout = 1; timeout < 20; timeout++) { if (!sbmirror.ready()) { FastOS_Thread::Sleep(50*timeout); @@ -167,7 +164,7 @@ public: try { slobrok::ConfiguratorFactory sbcfg("admin/slobrok.0"); - slobrok::api::MirrorAPI sbmirror(*_supervisor, sbcfg); + slobrok::api::MirrorAPI sbmirror(_frt->supervisor(), sbcfg); for (int timeout = 1; timeout < 20; timeout++) { if (!sbmirror.ready()) { FastOS_Thread::Sleep(50*timeout); @@ -249,9 +246,9 @@ public: } if (port != 0) { - _target = _supervisor->GetTarget(port); + _target = _frt->supervisor().GetTarget(port); } else { - _target = _supervisor->GetTarget(spec.c_str()); + _target = _frt->supervisor().GetTarget(spec.c_str()); } bool invoked = false; @@ -350,7 +347,7 @@ void App::monitorLoop() { for (;;) { - FRT_RPCRequest *req = _supervisor->AllocRPCRequest(); + FRT_RPCRequest *req = _frt->supervisor().AllocRPCRequest(); req->SetMethodName("pandora.rtc.getIncrementalState"); FRT_Values ¶ms = *req->GetParams(); params.AddInt32(2000); @@ -365,7 +362,7 @@ App::monitorLoop() FRT_Value &names = rvals.GetValue(0); FRT_Value &values = rvals.GetValue(1); struct timeval tnow; - gettimeofday(&tnow, NULL); + gettimeofday(&tnow, nullptr); for (unsigned int i = 0; i < names._string_array._len && diff --git a/searchcore/src/vespa/searchcore/fdispatch/common/rpc.cpp b/searchcore/src/vespa/searchcore/fdispatch/common/rpc.cpp index eaff3b90d78..437482dddd2 100644 --- a/searchcore/src/vespa/searchcore/fdispatch/common/rpc.cpp +++ b/searchcore/src/vespa/searchcore/fdispatch/common/rpc.cpp @@ -10,7 +10,7 @@ extern char FastS_VersionTag[]; FastS_RPC::FastS_RPC(FastS_AppContext *appCtx) : _appCtx(appCtx), _transport(), - _supervisor(&_transport, _appCtx->GetThreadPool()), + _supervisor(&_transport), _sbregister(_supervisor, slobrok::ConfiguratorFactory("admin/slobrok.0")) { } diff --git a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp index 2d63f192189..bcf767bb873 100644 --- a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp @@ -6,6 +6,7 @@ #include <vespa/searchcore/proton/matchengine/matchengine.h> #include <vespa/vespalib/util/closuretask.h> #include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/transport.h> #include <vespa/log/log.h> LOG_SETUP(".proton.server.rtchooks"); @@ -195,7 +196,8 @@ RPCHooksBase::Params::~Params() = default; RPCHooksBase::RPCHooksBase(Params ¶ms) : _proton(params.proton), _docsumByRPC(new DocsumByRPC(_proton.getDocsumBySlime())), - _orb(std::make_unique<FRT_Supervisor>()), + _transport(std::make_unique<FNET_Transport>()), + _orb(std::make_unique<FRT_Supervisor>(_transport.get())), _proto_rpc_adapter(std::make_unique<ProtoRpcAdapter>( _proton.get_search_server(), _proton.get_docsum_server(), @@ -212,7 +214,7 @@ RPCHooksBase::open(Params & params) initRPC(); _regAPI.registerName((params.identity + "/realtimecontroller").c_str()); _orb->Listen(params.rtcPort); - _orb->Start(); + _transport->Start(&_proton.getThreadPool()); LOG(debug, "started monitoring interface"); } @@ -222,7 +224,7 @@ void RPCHooksBase::close() { LOG(info, "shutting down monitoring interface"); - _orb->ShutDown(true); + _transport->ShutDown(true); _executor.shutdown(); { std::lock_guard<std::mutex> guard(_stateLock); diff --git a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h index ad0a69fcd55..f16237381d6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h +++ b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h @@ -12,6 +12,8 @@ #include <mutex> #include <condition_variable> +class FNET_Transport; + namespace proton { class Proton; @@ -64,6 +66,7 @@ private: Proton & _proton; std::unique_ptr<DocsumByRPC> _docsumByRPC; + std::unique_ptr<FNET_Transport> _transport; std::unique_ptr<FRT_Supervisor> _orb; std::unique_ptr<ProtoRpcAdapter> _proto_rpc_adapter; slobrok::api::RegisterAPI _regAPI; diff --git a/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp b/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp index 60cf1af13a0..6747e4e741e 100644 --- a/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp +++ b/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp @@ -62,23 +62,20 @@ struct MyMonitorServer : MonitorServer { }; struct ProtoRpcAdapterTest : ::testing::Test { - FRT_Supervisor orb; + fnet::frt::StandaloneFRT server; MySearchServer search; MyDocsumServer docsum; MyMonitorServer monitor; ProtoRpcAdapter adapter; ProtoRpcAdapterTest() - : orb(), adapter(search, docsum, monitor, orb) + : server(), adapter(search, docsum, monitor, server.supervisor()) { - orb.Listen(0); - orb.Start(); + server.supervisor().Listen(0); } FRT_Target *connect() { - return orb.GetTarget(orb.GetListenPort()); - } - ~ProtoRpcAdapterTest() { - orb.ShutDown(true); + return server.supervisor().GetTarget(server.supervisor().GetListenPort()); } + ~ProtoRpcAdapterTest() = default; }; //----------------------------------------------------------------------------- diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp index 767c8b45e10..2c6c1e249f4 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp @@ -4,6 +4,8 @@ #include <vespa/fnet/frt/supervisor.h> #include <vespa/fnet/frt/target.h> #include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/fnet/transport.h> +#include <vespa/fastos/thread.h> #include <thread> #include <vespa/log/log.h> @@ -47,19 +49,21 @@ TransLogClient::TransLogClient(const vespalib::string & rpcTarget) : _executor(1, 128 * 1024, translogclient_rpc_callback), _rpcTarget(rpcTarget), _sessions(), - _supervisor(std::make_unique<FRT_Supervisor>()), + _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)), + _transport(std::make_unique<FNET_Transport>()), + _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())), _target(NULL) { reconnect(); exportRPC(*_supervisor); - _supervisor->Start(); + _transport->Start(_threadPool.get()); } TransLogClient::~TransLogClient() { disconnect(); _executor.shutdown().sync(); - _supervisor->ShutDown(true); + _transport->ShutDown(true); } bool TransLogClient::reconnect() diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h index 267d6e3b0ed..38c30cd5b4c 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h @@ -10,6 +10,7 @@ #include <map> #include <vector> +class FNET_Transport; class FRT_Supervisor; class FRT_Target; @@ -122,7 +123,9 @@ private: SessionMap _sessions; //Brute force lock for subscriptions. For multithread safety. vespalib::Lock _lock; - std::unique_ptr<FRT_Supervisor> _supervisor; + std::unique_ptr<FastOS_ThreadPool> _threadPool; + std::unique_ptr<FNET_Transport> _transport; + std::unique_ptr<FRT_Supervisor> _supervisor; FRT_Target * _target; }; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index 4b3e7bddb07..6d11ab1f5eb 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -6,6 +6,7 @@ #include <vespa/fnet/frt/supervisor.h> #include <vespa/fnet/frt/rpcrequest.h> #include <vespa/fnet/task.h> +#include <vespa/fnet/transport.h> #include <fstream> #include <vespa/log/log.h> @@ -90,8 +91,9 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con _defaultCrcType(defaultCrcType), _commitExecutor(maxThreads, 128*1024), _sessionExecutor(maxThreads, 128*1024), - _threadPool(8192, 1), - _supervisor(std::make_unique<FRT_Supervisor>()), + _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)), + _transport(std::make_unique<FNET_Transport>()), + _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())), _domains(), _reqQ(), _fileHeaderContext(fileHeaderContext) @@ -119,7 +121,7 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con bool listenOk(false); for (int i(600); !listenOk && i; i--) { if (_supervisor->Listen(listenSpec)) { - _supervisor->Start(); + _transport->Start(_threadPool.get()); listenOk = true; } else { LOG(warning, "Failed listening at port %s trying for %d seconds more.", listenSpec, i); @@ -135,7 +137,7 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con } else { throw std::runtime_error(make_string("Failed creating tls base dir %s r(%d), e(%d). Requires manual intervention.", _baseDir.c_str(), retval, errno)); } - start(_threadPool); + start(*_threadPool); } TransLogServer::~TransLogServer() @@ -146,7 +148,7 @@ TransLogServer::~TransLogServer() _commitExecutor.sync(); _sessionExecutor.shutdown(); _sessionExecutor.sync(); - _supervisor->ShutDown(true); + _transport->ShutDown(true); } bool diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index 8aedfef6d8d..0d65f36e07d 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -10,6 +10,7 @@ class FRT_Supervisor; +class FNET_Transport; namespace search::common { class FileHeaderContext; } @@ -85,7 +86,8 @@ private: const DomainPart::Crc _defaultCrcType; vespalib::ThreadStackExecutor _commitExecutor; vespalib::ThreadStackExecutor _sessionExecutor; - FastOS_ThreadPool _threadPool; + std::unique_ptr<FastOS_ThreadPool> _threadPool; + std::unique_ptr<FNET_Transport> _transport; std::unique_ptr<FRT_Supervisor> _supervisor; DomainList _domains; mutable std::mutex _lock; // Protects _domains diff --git a/slobrok/src/apps/check_slobrok/check_slobrok.cpp b/slobrok/src/apps/check_slobrok/check_slobrok.cpp index ed72c459e44..0342cccb221 100644 --- a/slobrok/src/apps/check_slobrok/check_slobrok.cpp +++ b/slobrok/src/apps/check_slobrok/check_slobrok.cpp @@ -13,14 +13,14 @@ LOG_SETUP("check_slobrok"); class Slobrok_Checker : public FastOS_Application { private: - std::unique_ptr<FRT_Supervisor> _supervisor; + std::unique_ptr<fnet::frt::StandaloneFRT> _server; FRT_Target *_target; Slobrok_Checker(const Slobrok_Checker &); Slobrok_Checker &operator=(const Slobrok_Checker &); public: - Slobrok_Checker() : _supervisor(), _target(nullptr) {} + Slobrok_Checker() : _server(), _target(nullptr) {} virtual ~Slobrok_Checker(); int usage(); void initRPC(const char *spec); @@ -30,7 +30,7 @@ public: Slobrok_Checker::~Slobrok_Checker() { - LOG_ASSERT( !_supervisor); + LOG_ASSERT( !_server); LOG_ASSERT(_target == nullptr); } @@ -46,9 +46,8 @@ Slobrok_Checker::usage() void Slobrok_Checker::initRPC(const char *spec) { - _supervisor = std::make_unique<FRT_Supervisor>(); - _target = _supervisor->GetTarget(spec); - _supervisor->Start(); + _server = std::make_unique<fnet::frt::StandaloneFRT>(); + _target = _server->supervisor().GetTarget(spec); } @@ -59,9 +58,8 @@ Slobrok_Checker::finiRPC() _target->SubRef(); _target = nullptr; } - if (_supervisor) { - _supervisor->ShutDown(true); - _supervisor.reset(); + if (_server) { + _server.reset(); } } @@ -82,7 +80,7 @@ Slobrok_Checker::Main() initRPC(tmp.str().c_str()); } - FRT_RPCRequest *req = _supervisor->AllocRPCRequest(); + FRT_RPCRequest *req = _server->supervisor().AllocRPCRequest(); req->SetMethodName("slobrok.system.version"); _target->InvokeSync(req, 5.0); diff --git a/slobrok/src/apps/sbcmd/sbcmd.cpp b/slobrok/src/apps/sbcmd/sbcmd.cpp index c4b76bd7ed0..f32155ae80b 100644 --- a/slobrok/src/apps/sbcmd/sbcmd.cpp +++ b/slobrok/src/apps/sbcmd/sbcmd.cpp @@ -12,14 +12,14 @@ LOG_SETUP("vespa-slobrok-cmd"); class Slobrok_CMD : public FastOS_Application { private: - std::unique_ptr<FRT_Supervisor> _supervisor; + std::unique_ptr<fnet::frt::StandaloneFRT> _server; FRT_Target *_target; Slobrok_CMD(const Slobrok_CMD &); Slobrok_CMD &operator=(const Slobrok_CMD &); public: - Slobrok_CMD() : _supervisor(), _target(nullptr) {} + Slobrok_CMD() : _server(), _target(nullptr) {} virtual ~Slobrok_CMD(); int usage(); void initRPC(const char *spec); @@ -29,7 +29,7 @@ public: Slobrok_CMD::~Slobrok_CMD() { - LOG_ASSERT(! _supervisor); + LOG_ASSERT(! _server); LOG_ASSERT(_target == nullptr); } @@ -56,9 +56,8 @@ Slobrok_CMD::usage() void Slobrok_CMD::initRPC(const char *spec) { - _supervisor = std::make_unique<FRT_Supervisor>(); - _target = _supervisor->GetTarget(spec); - _supervisor->Start(); + _server = std::make_unique<fnet::frt::StandaloneFRT>(); + _target = _server->supervisor().GetTarget(spec); } @@ -69,9 +68,8 @@ Slobrok_CMD::finiRPC() _target->SubRef(); _target = nullptr; } - if (_supervisor) { - _supervisor->ShutDown(true); - _supervisor.reset(); + if (_server) { + _server.reset(); } } @@ -95,7 +93,7 @@ Slobrok_CMD::Main() bool threeTables = false; bool twoTables = false; - FRT_RPCRequest *req = _supervisor->AllocRPCRequest(); + FRT_RPCRequest *req = _server->supervisor().AllocRPCRequest(); req->SetMethodName(_argv[2]); if (strcmp(_argv[2], "slobrok.admin.listAllRpcServers") == 0) { diff --git a/slobrok/src/tests/configure/configure.cpp b/slobrok/src/tests/configure/configure.cpp index 7b1c32bceef..2783d0e3ebf 100644 --- a/slobrok/src/tests/configure/configure.cpp +++ b/slobrok/src/tests/configure/configure.cpp @@ -97,8 +97,8 @@ Test::Main() { TEST_INIT("configure_test"); - FRT_Supervisor orb1; - FRT_Supervisor orb2; + fnet::frt::StandaloneFRT orb1; + fnet::frt::StandaloneFRT orb2; config::ConfigSet set; cloud::config::SlobroksConfigBuilder srv1Builder; @@ -141,18 +141,16 @@ Test::Main() SlobrokServer serverOne(srvConfig1); SlobrokServer serverTwo(srvConfig2); - MirrorAPI mirror1(orb1, cliConfig3); // NB this one will be changed - MirrorAPI mirror2(orb2, cliConfig2); + MirrorAPI mirror1(orb1.supervisor(), cliConfig3); // NB this one will be changed + MirrorAPI mirror2(orb2.supervisor(), cliConfig2); - RegisterAPI reg1(orb1, cliConfig1); - RegisterAPI reg2(orb2, cliConfig2); + RegisterAPI reg1(orb1.supervisor(), cliConfig1); + RegisterAPI reg2(orb2.supervisor(), cliConfig2); - orb1.Listen(18526); - orb2.Listen(18527); - std::string myspec1 = createSpec(orb1.GetListenPort()); - std::string myspec2 = createSpec(orb2.GetListenPort()); - orb1.Start(); - orb2.Start(); + orb1.supervisor().Listen(18526); + orb2.supervisor().Listen(18527); + std::string myspec1 = createSpec(orb1.supervisor().GetListenPort()); + std::string myspec2 = createSpec(orb2.supervisor().GetListenPort()); reg1.registerName("A"); reg2.registerName("B"); @@ -183,16 +181,14 @@ Test::Main() reg1.registerName("A"); reg2.registerName("B"); - FRT_Supervisor orb3; - FRT_Supervisor orb4; - RegisterAPI reg3(orb3, cliConfig1); - RegisterAPI reg4(orb4, cliConfig2); - orb3.Listen(18528); - orb4.Listen(18529); - std::string myspec3 = createSpec(orb3.GetListenPort()); - std::string myspec4 = createSpec(orb4.GetListenPort()); - orb3.Start(); - orb4.Start(); + fnet::frt::StandaloneFRT orb3; + fnet::frt::StandaloneFRT orb4; + RegisterAPI reg3(orb3.supervisor(), cliConfig1); + RegisterAPI reg4(orb4.supervisor(), cliConfig2); + orb3.supervisor().Listen(18528); + orb4.supervisor().Listen(18529); + std::string myspec3 = createSpec(orb3.supervisor().GetListenPort()); + std::string myspec4 = createSpec(orb4.supervisor().GetListenPort()); reg3.registerName("B"); reg4.registerName("A"); @@ -217,9 +213,5 @@ Test::Main() serverOne.stop(); serverTwo.stop(); - orb1.ShutDown(true); - orb2.ShutDown(true); - orb3.ShutDown(true); - orb4.ShutDown(true); TEST_DONE(); } diff --git a/slobrok/src/tests/mirrorapi/mirrorapi.cpp b/slobrok/src/tests/mirrorapi/mirrorapi.cpp index f77dfd80986..23cd201c551 100644 --- a/slobrok/src/tests/mirrorapi/mirrorapi.cpp +++ b/slobrok/src/tests/mirrorapi/mirrorapi.cpp @@ -21,7 +21,7 @@ TEST_SETUP(Test); class Server : public FRT_Invokable { private: - FRT_Supervisor _orb; + fnet::frt::StandaloneFRT _server; std::string _name; std::string _slobrokSpec; @@ -34,12 +34,12 @@ public: Server::Server(std::string name, int port, std::string slobrokSpec) - : _orb(), + : _server(), _name(name), _slobrokSpec(slobrokSpec) { { - FRT_ReflectionBuilder rb(&_orb); + FRT_ReflectionBuilder rb(&_server.supervisor()); //--------------------------------------------------------------------- rb.DefineMethod("slobrok.callback.listNamesServed", "", "S", FRT_METHOD(Server::rpc_listNamesServed), this); @@ -47,8 +47,7 @@ Server::Server(std::string name, int port, std::string slobrokSpec) rb.ReturnDesc("names", "The rpcserver names on this server"); //--------------------------------------------------------------------- } - _orb.Listen(port); - _orb.Start(); + _server.supervisor().Listen(port); } @@ -56,14 +55,14 @@ void Server::reg() { char spec[64]; - sprintf(spec, "tcp/localhost:%d", _orb.GetListenPort()); + sprintf(spec, "tcp/localhost:%d", _server.supervisor().GetListenPort()); - FRT_RPCRequest *req = _orb.AllocRPCRequest(); + FRT_RPCRequest *req = _server.supervisor().AllocRPCRequest(); req->SetMethodName("slobrok.registerRpcServer"); req->GetParams()->AddString(_name.c_str()); req->GetParams()->AddString(spec); - FRT_Target *sb = _orb.GetTarget(_slobrokSpec.c_str()); + FRT_Target *sb = _server.supervisor().GetTarget(_slobrokSpec.c_str()); sb->InvokeSync(req, 5.0); sb->SubRef(); req->SubRef(); @@ -79,10 +78,7 @@ Server::rpc_listNamesServed(FRT_RPCRequest *req) } -Server::~Server() -{ - _orb.ShutDown(true); -} +Server::~Server() = default; //----------------------------------------------------------------------------- @@ -140,10 +136,9 @@ Test::Main() cloud::config::SlobroksConfig::Slobrok slobrok; slobrok.connectionspec = "tcp/localhost:18501"; specBuilder.slobrok.push_back(slobrok); - FRT_Supervisor orb; - MirrorAPI mirror(orb, config::ConfigUri::createFromInstance(specBuilder)); + fnet::frt::StandaloneFRT server; + MirrorAPI mirror(server.supervisor(), config::ConfigUri::createFromInstance(specBuilder)); EXPECT_TRUE(!mirror.ready()); - orb.Start(); FastOS_Thread::Sleep(1000); a.reg(); @@ -224,6 +219,5 @@ Test::Main() .add("A/x/w", "tcp/localhost:18502"))); mock.stop(); - orb.ShutDown(true); TEST_DONE(); } diff --git a/slobrok/src/tests/oldapi/old.cpp b/slobrok/src/tests/oldapi/old.cpp index 42cec186a08..26cf94613bd 100644 --- a/slobrok/src/tests/oldapi/old.cpp +++ b/slobrok/src/tests/oldapi/old.cpp @@ -19,9 +19,9 @@ TEST_SETUP(Test); class Server : public FRT_Invokable { private: - FRT_Supervisor _orb; - std::string _name; - std::string _slobrokSpec; + fnet::frt::StandaloneFRT _server; + std::string _name; + std::string _slobrokSpec; public: Server(std::string name, int port, std::string slobrokSpec); @@ -32,12 +32,12 @@ public: Server::Server(std::string name, int port, std::string slobrokSpec) - : _orb(), + : _server(), _name(name), _slobrokSpec(slobrokSpec) { { - FRT_ReflectionBuilder rb(&_orb); + FRT_ReflectionBuilder rb(&_server.supervisor()); //--------------------------------------------------------------------- rb.DefineMethod("slobrok.callback.listNamesServed", "", "S", FRT_METHOD(Server::rpc_listNamesServed), this); @@ -45,8 +45,7 @@ Server::Server(std::string name, int port, std::string slobrokSpec) rb.ReturnDesc("names", "The rpcserver names on this server"); //--------------------------------------------------------------------- } - _orb.Listen(port); - _orb.Start(); + _server.supervisor().Listen(port); } @@ -54,14 +53,14 @@ void Server::reg() { char spec[64]; - sprintf(spec, "tcp/localhost:%d", _orb.GetListenPort()); + sprintf(spec, "tcp/localhost:%d", _server.supervisor().GetListenPort()); - FRT_RPCRequest *req = _orb.AllocRPCRequest(); + FRT_RPCRequest *req = _server.supervisor().AllocRPCRequest(); req->SetMethodName("slobrok.registerRpcServer"); req->GetParams()->AddString(_name.c_str()); req->GetParams()->AddString(spec); - FRT_Target *sb = _orb.GetTarget(_slobrokSpec.c_str()); + FRT_Target *sb = _server.supervisor().GetTarget(_slobrokSpec.c_str()); sb->InvokeSync(req, 5.0); sb->SubRef(); req->SubRef(); @@ -77,10 +76,7 @@ Server::rpc_listNamesServed(FRT_RPCRequest *req) } -Server::~Server() -{ - _orb.ShutDown(true); -} +Server::~Server() = default; //----------------------------------------------------------------------------- @@ -136,10 +132,9 @@ Test::Main() std::vector<std::string> slobrokSpecs; slobrokSpecs.push_back("tcp/localhost:18531"); - FRT_Supervisor orb; - MirrorOld mirror(orb, slobrokSpecs); + fnet::frt::StandaloneFRT server; + MirrorOld mirror(server.supervisor(), slobrokSpecs); EXPECT_TRUE(!mirror.ready()); - orb.Start(); FastOS_Thread::Sleep(1000); a.reg(); @@ -217,6 +212,5 @@ Test::Main() .add("A/x/w", "tcp/localhost:18532"))); mock.stop(); - orb.ShutDown(true); TEST_DONE(); } diff --git a/slobrok/src/tests/registerapi/registerapi.cpp b/slobrok/src/tests/registerapi/registerapi.cpp index 2b356319ba0..ac7e662c6f2 100644 --- a/slobrok/src/tests/registerapi/registerapi.cpp +++ b/slobrok/src/tests/registerapi/registerapi.cpp @@ -83,12 +83,12 @@ Test::Main() slobrokSpecs.slobrok.push_back(sb); slobrok::ConfiguratorFactory config(config::ConfigUri::createFromInstance(slobrokSpecs)); - FRT_Supervisor orb; + fnet::frt::StandaloneFRT server; + FRT_Supervisor & orb = server.supervisor(); RegisterAPI reg(orb, config); MirrorAPI mirror(orb, config); orb.Listen(18549); std::string myspec = createSpec(orb); - orb.Start(); reg.registerName("A/x/w"); EXPECT_TRUE(reg.busy()); @@ -216,6 +216,5 @@ Test::Main() .add("F/y/w", myspec.c_str()))); mock.stop(); - orb.ShutDown(true); TEST_DONE(); } diff --git a/slobrok/src/tests/standalone/standalone.cpp b/slobrok/src/tests/standalone/standalone.cpp index 136f8125c8b..9d3fd694ee1 100644 --- a/slobrok/src/tests/standalone/standalone.cpp +++ b/slobrok/src/tests/standalone/standalone.cpp @@ -9,7 +9,7 @@ class Server : public FRT_Invokable { private: - FRT_Supervisor _orb; + fnet::frt::StandaloneFRT _server; std::string _name; public: @@ -20,11 +20,11 @@ public: Server::Server(std::string name, int port) - : _orb(), + : _server(), _name(name) { { - FRT_ReflectionBuilder rb(&_orb); + FRT_ReflectionBuilder rb(&_server.supervisor()); //--------------------------------------------------------------------- rb.DefineMethod("slobrok.callback.listNamesServed", "", "S", FRT_METHOD(Server::rpc_listNamesServed), this); @@ -32,8 +32,7 @@ Server::Server(std::string name, int port) rb.ReturnDesc("names", "The rpcserver names on this server"); //--------------------------------------------------------------------- } - _orb.Listen(port); - _orb.Start(); + _server.supervisor().Listen(port); } @@ -46,16 +45,13 @@ Server::rpc_listNamesServed(FRT_RPCRequest *req) } -Server::~Server() -{ - _orb.ShutDown(true); -} +Server::~Server() = default; namespace { bool checkOk(FRT_RPCRequest *req) { - if (req == NULL) { + if (req == nullptr) { fprintf(stderr, "req is null pointer, this is bad\n"); return false; } @@ -80,7 +76,7 @@ private: public: SubReferer(T* &t) : _t(t) {} ~SubReferer() { - if (_t != NULL) _t->SubRef(); + if (_t != nullptr) _t->SubRef(); } }; @@ -118,14 +114,13 @@ TEST("standalone") { slobrok::SlobrokServer slobrokServer(18541); Stopper<slobrok::SlobrokServer> ssCleaner(slobrokServer); - FRT_Supervisor orb; - orb.Start(); - ShutDowner<FRT_Supervisor> orbCleaner(orb); + fnet::frt::StandaloneFRT server; + FRT_Supervisor & orb = server.supervisor(); FRT_Target *sb = orb.GetTarget(18541); SubReferer<FRT_Target> sbCleaner(sb); - FRT_RPCRequest *req = NULL; + FRT_RPCRequest *req = nullptr; SubReferer<FRT_RPCRequest> reqCleaner(req); for (int retry=0; retry < 5*61; retry++) { diff --git a/slobrok/src/tests/startsome/rpc_info.cpp b/slobrok/src/tests/startsome/rpc_info.cpp index 036c486fe4f..0f8739d12be 100644 --- a/slobrok/src/tests/startsome/rpc_info.cpp +++ b/slobrok/src/tests/startsome/rpc_info.cpp @@ -12,16 +12,16 @@ public: void GetReq(FRT_RPCRequest **req, FRT_Supervisor *supervisor) { - if ((*req) != NULL) + if ((*req) != nullptr) (*req)->SubRef(); (*req) = supervisor->AllocRPCRequest(); } void FreeReqs(FRT_RPCRequest *r1, FRT_RPCRequest *r2) { - if (r1 != NULL) + if (r1 != nullptr) r1->SubRef(); - if (r2 != NULL) + if (r2 != nullptr) r2->SubRef(); } @@ -75,11 +75,11 @@ public: } bool verbose = (_argc > 2 && strcmp(_argv[2], "verbose") == 0); - FRT_Supervisor supervisor; + fnet::frt::StandaloneFRT server; + FRT_Supervisor & supervisor = server.supervisor(); FRT_Target *target = supervisor.GetTarget(_argv[1]); - FRT_RPCRequest *m_list = NULL; - FRT_RPCRequest *info = NULL; - supervisor.Start(); + FRT_RPCRequest *m_list = nullptr; + FRT_RPCRequest *info = nullptr; GetReq(&info, &supervisor); info->SetMethodName("frt.rpc.ping"); @@ -87,7 +87,6 @@ public: if (info->IsError()) { fprintf(stderr, "Error talking to %s\n", _argv[1]); FreeReqs(m_list, info); - supervisor.ShutDown(true); return 1; } @@ -125,7 +124,6 @@ public: } FreeReqs(m_list, info); target->SubRef(); - supervisor.ShutDown(true); return 0; } }; diff --git a/slobrok/src/tests/startsome/tstdst.cpp b/slobrok/src/tests/startsome/tstdst.cpp index 4723b3819d7..c119ed3c026 100644 --- a/slobrok/src/tests/startsome/tstdst.cpp +++ b/slobrok/src/tests/startsome/tstdst.cpp @@ -132,7 +132,7 @@ RPCHooks::rpc_stop(FRT_RPCRequest *req) TstEnv::TstEnv(int sbp, int myp, const char *n) : _transport(new FNET_Transport()), - _supervisor(new FRT_Supervisor(_transport, NULL)), + _supervisor(new FRT_Supervisor(_transport)), _myport(myp), _sbport(sbp), _rpcHooks(NULL), diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp index 4e510b61e70..91a283b17f3 100644 --- a/slobrok/src/vespa/slobrok/server/sbenv.cpp +++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp @@ -86,7 +86,7 @@ ConfigTask::PerformTask() SBEnv::SBEnv(const ConfigShim &shim) : _transport(std::make_unique<FNET_Transport>()), - _supervisor(std::make_unique<FRT_Supervisor>(_transport.get(), nullptr)), + _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())), _configShim(shim), _configurator(shim.factory().create(*this)), _shuttingDown(false), diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp index ec488b25714..2f1cdc74ce1 100644 --- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp +++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp @@ -7,6 +7,7 @@ #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/host_name.h> #include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/transport.h> #include <sstream> #include <vespa/log/log.h> @@ -16,7 +17,9 @@ namespace storage { FNetListener::FNetListener(MessageEnqueuer& messageEnqueuer, const config::ConfigUri & configUri, uint32_t port) : _messageEnqueuer(messageEnqueuer), - _orb(std::make_unique<FRT_Supervisor>()), + _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)), + _transport(std::make_unique<FNET_Transport>()), + _orb(std::make_unique<FRT_Supervisor>(_transport.get())), _closed(false), _slobrokRegister(*_orb, configUri) { @@ -26,7 +29,7 @@ FNetListener::FNetListener(MessageEnqueuer& messageEnqueuer, const config::Confi ost << "Failed to listen to RPC port " << port << "."; throw vespalib::IllegalStateException(ost.str(), VESPA_STRLOC); } - _orb->Start(); + _transport->Start(_threadPool.get()); } FNetListener::~FNetListener() @@ -57,7 +60,7 @@ FNetListener::close() { _closed = true; _slobrokRegister.unregisterName(_handle); - _orb->ShutDown(true); + _transport->ShutDown(true); } void diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.h b/storage/src/vespa/storage/storageserver/fnetlistener.h index 2097be15491..8d24311bc57 100644 --- a/storage/src/vespa/storage/storageserver/fnetlistener.h +++ b/storage/src/vespa/storage/storageserver/fnetlistener.h @@ -4,6 +4,8 @@ #include <vespa/slobrok/sbregister.h> #include <atomic> +class FNET_Transport; + namespace storage { namespace api { class StorageMessage; } @@ -33,11 +35,13 @@ public: int getListenPort() const; private: - MessageEnqueuer& _messageEnqueuer; - std::unique_ptr<FRT_Supervisor> _orb; - std::atomic<bool> _closed; - slobrok::api::RegisterAPI _slobrokRegister; - vespalib::string _handle; + MessageEnqueuer& _messageEnqueuer; + std::unique_ptr<FastOS_ThreadPool> _threadPool; + std::unique_ptr<FNET_Transport> _transport; + std::unique_ptr<FRT_Supervisor> _orb; + std::atomic<bool> _closed; + slobrok::api::RegisterAPI _slobrokRegister; + vespalib::string _handle; void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest *req); }; diff --git a/storage/src/vespa/storage/tools/storage-cmd.cpp b/storage/src/vespa/storage/tools/storage-cmd.cpp index 564574627a7..daaa890873f 100644 --- a/storage/src/vespa/storage/tools/storage-cmd.cpp +++ b/storage/src/vespa/storage/tools/storage-cmd.cpp @@ -18,22 +18,22 @@ private: const char *value = param + 2; switch (param[0]) { case 'b': - req->GetParams()->AddInt8(strtoll(value, NULL, 0)); + req->GetParams()->AddInt8(strtoll(value, nullptr, 0)); break; case 'h': - req->GetParams()->AddInt16(strtoll(value, NULL, 0)); + req->GetParams()->AddInt16(strtoll(value, nullptr, 0)); break; case 'i': - req->GetParams()->AddInt32(strtoll(value, NULL, 0)); + req->GetParams()->AddInt32(strtoll(value, nullptr, 0)); break; case 'l': - req->GetParams()->AddInt64(strtoll(value, NULL, 0)); + req->GetParams()->AddInt64(strtoll(value, nullptr, 0)); break; case 'f': - req->GetParams()->AddFloat(vespalib::locale::c::strtod(value, NULL)); + req->GetParams()->AddFloat(vespalib::locale::c::strtod(value, nullptr)); break; case 'd': - req->GetParams()->AddDouble(vespalib::locale::c::strtod(value, NULL)); + req->GetParams()->AddDouble(vespalib::locale::c::strtod(value, nullptr)); break; case 's': req->GetParams()->AddString(value); @@ -55,11 +55,10 @@ public: return 1; } int retCode = 0; - FRT_Supervisor supervisor; - supervisor.Start(); + fnet::frt::StandaloneFRT supervisor; slobrok::ConfiguratorFactory sbcfg("admin/slobrok.0"); - slobrok::api::MirrorAPI mirror(supervisor, sbcfg); + slobrok::api::MirrorAPI mirror(supervisor.supervisor(), sbcfg); while (!mirror.ready()) { FastOS_Thread::Sleep(10); @@ -72,11 +71,11 @@ public: } for (size_t j = 0; j < list.size(); j++) { - FRT_Target *target = supervisor.GetTarget(list[j].second.c_str()); + FRT_Target *target = supervisor.supervisor().GetTarget(list[j].second.c_str()); // If not fleet controller, need to connect first. - if (strstr(_argv[1], "fleetcontroller") == NULL) { - FRT_RPCRequest *req = supervisor.AllocRPCRequest(); + if (strstr(_argv[1], "fleetcontroller") == nullptr) { + FRT_RPCRequest *req = supervisor.supervisor().AllocRPCRequest(); req->SetMethodName("vespa.storage.connect"); req->GetParams()->AddString(_argv[1]); target->InvokeSync(req, 10.0); @@ -90,7 +89,7 @@ public: req->SubRef(); } - FRT_RPCRequest *req = supervisor.AllocRPCRequest(); + FRT_RPCRequest *req = supervisor.supervisor().AllocRPCRequest(); req->SetMethodName(_argv[2]); for (int i = 3; i < _argc; ++i) { @@ -115,7 +114,6 @@ public: req->SubRef(); target->SubRef(); } - supervisor.ShutDown(true); return retCode; } }; diff --git a/storageserver/src/tests/storageservertest.cpp b/storageserver/src/tests/storageservertest.cpp index 2ef770b68b1..413acf89f27 100644 --- a/storageserver/src/tests/storageservertest.cpp +++ b/storageserver/src/tests/storageservertest.cpp @@ -33,51 +33,50 @@ using document::test::makeDocumentBucket; namespace storage { namespace { - uint64_t getTimeInMillis() { - struct timeval t; - gettimeofday(&t, 0); - return (t.tv_sec * uint64_t(1000)) + (t.tv_usec / uint64_t(1000)); - } - - class SlobrokMirror { - config::ConfigUri config; - FRT_Supervisor visor; - std::unique_ptr<slobrok::api::MirrorAPI> mirror; - public: - SlobrokMirror(const config::ConfigUri & cfg) : config(cfg) {} +uint64_t getTimeInMillis() { + struct timeval t; + gettimeofday(&t, 0); + return (t.tv_sec * uint64_t(1000)) + (t.tv_usec / uint64_t(1000)); +} - void init(uint32_t timeoutms) { - uint64_t timeout = getTimeInMillis() + timeoutms; - visor.Start(); - mirror.reset(new slobrok::api::MirrorAPI(visor, config)); - while (!mirror->ready()) { - if (getTimeInMillis() > timeout) - throw vespalib::IllegalStateException( - "Failed to initialize slobrok mirror within " - "timeout.", VESPA_STRLOC); - FastOS_Thread::Sleep(1); - } +class SlobrokMirror { + config::ConfigUri config; + fnet::frt::StandaloneFRT visor; + std::unique_ptr<slobrok::api::MirrorAPI> mirror; + +public: + SlobrokMirror(const config::ConfigUri &cfg) : config(cfg) {} + + void init(uint32_t timeoutms) { + uint64_t timeout = getTimeInMillis() + timeoutms; + mirror.reset(new slobrok::api::MirrorAPI(visor.supervisor(), config)); + while (!mirror->ready()) { + if (getTimeInMillis() > timeout) + throw vespalib::IllegalStateException( + "Failed to initialize slobrok mirror within " + "timeout.", VESPA_STRLOC); + FastOS_Thread::Sleep(1); } + } - slobrok::api::MirrorAPI& getMirror() { - if (mirror.get() == 0) throw vespalib::IllegalStateException( + slobrok::api::MirrorAPI &getMirror() { + if (mirror.get() == 0) + throw vespalib::IllegalStateException( "You need to call init() before you can fetch mirror"); - return *mirror; - } - FRT_Supervisor& getSupervisor() { - if (mirror.get() == 0) throw vespalib::IllegalStateException( + return *mirror; + } + + FRT_Supervisor &getSupervisor() { + if (mirror.get() == 0) + throw vespalib::IllegalStateException( "You need to call init() before you can fetch supervisor"); - return visor; - } + return visor.supervisor(); + } + + ~SlobrokMirror() = default; +}; - ~SlobrokMirror() { - if (mirror) { - mirror.reset(); - visor.ShutDown(true); - } - } - }; } struct StorageServerTest : public CppUnit::TestFixture { diff --git a/vespaclient/src/vespa/vespaclient/vdsstates/statesapp.cpp b/vespaclient/src/vespa/vespaclient/vdsstates/statesapp.cpp index b24eb3e0373..9ec9249b671 100644 --- a/vespaclient/src/vespa/vespaclient/vdsstates/statesapp.cpp +++ b/vespaclient/src/vespa/vespaclient/vdsstates/statesapp.cpp @@ -247,17 +247,16 @@ struct StateApp : public FastOS_Application { } int run() { - FRT_Supervisor supervisor; - supervisor.Start(); + fnet::frt::StandaloneFRT supervisor; std::unique_ptr<slobrok::api::MirrorAPI> slobrok; if (_options._slobrokConnectionSpec == "") { config::ConfigUri config(_options._slobrokConfigId); - slobrok.reset(new slobrok::api::MirrorAPI(supervisor, config)); + slobrok.reset(new slobrok::api::MirrorAPI(supervisor.supervisor(), config)); } else { std::vector<std::string> specList; specList.push_back(_options._slobrokConnectionSpec); - slobrok.reset(new slobrok::api::MirrorAPI(supervisor, specList)); + slobrok.reset(new slobrok::api::MirrorAPI(supervisor.supervisor(), specList)); } LOG(debug, "Waiting for slobrok data to be available."); uint64_t startTime = getTimeInMillis(); @@ -287,7 +286,6 @@ struct StateApp : public FastOS_Application { } if (!slobrok->ready()) { std::cerr << "Slobrok not ready.\n"; - supervisor.ShutDown(true); return 1; } @@ -300,13 +298,12 @@ struct StateApp : public FastOS_Application { if (specs.size() == 0) { std::cerr << "No fleet controller could be found for '" << mask << ".\n"; - supervisor.ShutDown(true); return 1; } std::sort(specs.begin(), specs.end(), Sorter()); LOG(debug, "Found fleet controller %s - %s\n", specs.front().first.c_str(), specs.front().second.c_str()); - FRT_Target *target = supervisor.GetTarget(specs.front().second.c_str()); + FRT_Target *target = supervisor.supervisor().GetTarget(specs.front().second.c_str()); if (!_options._nonfriendlyOutput && _options._mode == GETNODESTATE) { std::cerr << @@ -341,7 +338,7 @@ struct StateApp : public FastOS_Application { indexes.push_back(_options._nodeIndex); } else { std::string hostname(vespa::Defaults::vespaHostname()); - FRT_RPCRequest* req = supervisor.AllocRPCRequest(); + FRT_RPCRequest* req = supervisor.supervisor().AllocRPCRequest(); req->SetMethodName("getNodeList"); target->InvokeSync(req, 10.0); std::string prefix = _options._cluster.getConfigId() + "/" + nodeType + "/"; @@ -380,7 +377,7 @@ struct StateApp : public FastOS_Application { break; } for (uint32_t j=0; j<indexes.size(); ++j) { - FRT_RPCRequest* req = supervisor.AllocRPCRequest(); + FRT_RPCRequest* req = supervisor.supervisor().AllocRPCRequest(); if (_options._mode == GETNODESTATE) { req->SetMethodName("getNodeState"); req->GetParams()->AddString(nodeType.c_str()); @@ -448,7 +445,6 @@ struct StateApp : public FastOS_Application { } } target->SubRef(); - supervisor.ShutDown(true); return (failed ? 1 : 0); } }; diff --git a/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp b/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp index 5f2bbe8e11a..ae64db4d452 100644 --- a/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp +++ b/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp @@ -460,8 +460,7 @@ Application::printServices() const void Application::getServices(slobrok::api::IMirrorAPI::SpecList &ret, uint32_t depth) const { - FRT_Supervisor frt; - frt.Start(); + fnet::frt::StandaloneFRT frt; std::string pattern = "*"; for (uint32_t i = 0; i < depth; ++i) { @@ -469,14 +468,13 @@ Application::getServices(slobrok::api::IMirrorAPI::SpecList &ret, uint32_t depth for (slobrok::api::IMirrorAPI::SpecList::iterator it = lst.begin(); it != lst.end(); ++it) { - if (isService(frt, it->second)) { + if (isService(frt.supervisor(), it->second)) { ret.push_back(*it); } } pattern.append("/*"); } - frt.ShutDown(true); } bool |