summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-02-25 07:53:11 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2022-02-25 07:53:11 +0000
commita13c8dbf74a65db3861947d143e894dba57195c3 (patch)
treeb7cd7004cc8694fc5bba03741db60c0233428715 /searchcore
parent94719c46c713f986a650910715e5847f5defa407 (diff)
Move start and run into separate method to properly scope lifetime of objects and shutdown common transport last.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/apps/proton/proton.cpp114
1 files changed, 59 insertions, 55 deletions
diff --git a/searchcore/src/apps/proton/proton.cpp b/searchcore/src/apps/proton/proton.cpp
index 3a31f941506..b806d2b0459 100644
--- a/searchcore/src/apps/proton/proton.cpp
+++ b/searchcore/src/apps/proton/proton.cpp
@@ -43,6 +43,7 @@ class App : public FastOS_Application
private:
static void setupSignals();
Params parseParams();
+ void startAndRun(FastOS_ThreadPool & threadPool, FNET_Transport & transport);
public:
int Main() override;
};
@@ -185,68 +186,72 @@ buildTransportConfig() {
}
+void
+App::startAndRun(FastOS_ThreadPool & threadPool, FNET_Transport & transport) {
+ Params params = parseParams();
+ LOG(debug, "identity: '%s'", params.identity.c_str());
+ LOG(debug, "serviceidentity: '%s'", params.serviceidentity.c_str());
+ LOG(debug, "subscribeTimeout: '%" PRIu64 "'", params.subscribeTimeout);
+ std::chrono::milliseconds subscribeTimeout(params.subscribeTimeout);
+
+ config::ConfigServerSpec configServerSpec(transport);
+ config::ConfigUri identityUri(params.identity, std::make_shared<config::ConfigContext>(configServerSpec));
+ auto protonUP = std::make_unique<proton::Proton>(threadPool, transport, identityUri,
+ _argc > 0 ? _argv[0] : "proton", subscribeTimeout);
+ proton::Proton & proton = *protonUP;
+ proton::BootstrapConfig::SP configSnapshot = proton.init();
+ if (proton.hasAbortedInit()) {
+ EV_STOPPING("proton", "shutdown after aborted init");
+ } else {
+ const ProtonConfig &protonConfig = configSnapshot->getProtonConfig();
+ vespalib::string basedir = protonConfig.basedir;
+ vespalib::mkdir(basedir, true);
+ {
+ ExitOnSignal exit_on_signal;
+ proton.init(configSnapshot);
+ }
+ configSnapshot.reset();
+ std::unique_ptr<ProtonServiceLayerProcess> spiProton;
+
+ if ( ! params.serviceidentity.empty()) {
+ spiProton = std::make_unique<ProtonServiceLayerProcess>(identityUri.createWithNewId(params.serviceidentity), proton);
+ spiProton->setupConfig(subscribeTimeout);
+ spiProton->createNode();
+ EV_STARTED("servicelayer");
+ } else {
+ proton.getMetricManager().init(identityUri, proton.getThreadPool());
+ }
+ EV_STARTED("proton");
+ while (!(SIG::INT.check() || SIG::TERM.check() || (spiProton && spiProton->getNode().attemptedStopped()))) {
+ std::this_thread::sleep_for(1000ms);
+ if (spiProton && spiProton->configUpdated()) {
+ storage::ResumeGuard guard(spiProton->getNode().pause());
+ spiProton->updateConfig();
+ }
+ }
+ // Ensure metric manager and state server are shut down before we start tearing
+ // down any service layer components that they may end up transitively using.
+ protonUP->shutdown_config_fetching_and_state_exposing_components_once();
+ if (spiProton) {
+ spiProton->getNode().requestShutdown("controlled shutdown");
+ spiProton->shutdown();
+ EV_STOPPING("servicelayer", "clean shutdown");
+ }
+ protonUP.reset();
+ EV_STOPPING("proton", "clean shutdown");
+ }
+}
+
int
App::Main()
{
- proton::Proton::UP protonUP;
try {
setupSignals();
- Params params = parseParams();
- LOG(debug, "identity: '%s'", params.identity.c_str());
- LOG(debug, "serviceidentity: '%s'", params.serviceidentity.c_str());
- LOG(debug, "subscribeTimeout: '%" PRIu64 "'", params.subscribeTimeout);
- std::chrono::milliseconds subscribeTimeout(params.subscribeTimeout);
FastOS_ThreadPool threadPool(128_Ki);
-
FNET_Transport transport(buildTransportConfig());
transport.Start(&threadPool);
- config::ConfigServerSpec configServerSpec(transport);
- config::ConfigUri identityUri(params.identity, std::make_shared<config::ConfigContext>(configServerSpec));
- protonUP = std::make_unique<proton::Proton>(threadPool, transport, identityUri,
- _argc > 0 ? _argv[0] : "proton", subscribeTimeout);
- proton::Proton & proton = *protonUP;
- proton::BootstrapConfig::SP configSnapshot = proton.init();
- if (proton.hasAbortedInit()) {
- EV_STOPPING("proton", "shutdown after aborted init");
- } else {
- const ProtonConfig &protonConfig = configSnapshot->getProtonConfig();
- vespalib::string basedir = protonConfig.basedir;
- vespalib::mkdir(basedir, true);
- {
- ExitOnSignal exit_on_signal;
- proton.init(configSnapshot);
- }
- configSnapshot.reset();
- std::unique_ptr<ProtonServiceLayerProcess> spiProton;
-
- if ( ! params.serviceidentity.empty()) {
- spiProton = std::make_unique<ProtonServiceLayerProcess>(identityUri.createWithNewId(params.serviceidentity), proton);
- spiProton->setupConfig(subscribeTimeout);
- spiProton->createNode();
- EV_STARTED("servicelayer");
- } else {
- proton.getMetricManager().init(identityUri, proton.getThreadPool());
- }
- EV_STARTED("proton");
- while (!(SIG::INT.check() || SIG::TERM.check() || (spiProton && spiProton->getNode().attemptedStopped()))) {
- std::this_thread::sleep_for(1000ms);
- if (spiProton && spiProton->configUpdated()) {
- storage::ResumeGuard guard(spiProton->getNode().pause());
- spiProton->updateConfig();
- }
- }
- // Ensure metric manager and state server are shut down before we start tearing
- // down any service layer components that they may end up transitively using.
- protonUP->shutdown_config_fetching_and_state_exposing_components_once();
- if (spiProton) {
- spiProton->getNode().requestShutdown("controlled shutdown");
- spiProton->shutdown();
- EV_STOPPING("servicelayer", "clean shutdown");
- }
- protonUP.reset();
- transport.ShutDown(true);
- EV_STOPPING("proton", "clean shutdown");
- }
+ startAndRun(threadPool, transport);
+ transport.ShutDown(true);
} catch (const vespalib::InvalidCommandLineArgumentsException &e) {
LOG(warning, "Invalid commandline arguments: '%s'", e.what());
return 1;
@@ -267,7 +272,6 @@ App::Main()
LOG(error, "Unknown IllegalStateException: '%s'", e.what());
throw;
}
- protonUP.reset();
LOG(debug, "Fully stopped, all destructors run.)");
return 0;
}