diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-02-24 12:50:31 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-02-24 12:50:31 +0000 |
commit | 9d8de3b985984987a4eae67e5d6d3a3fbdfc7efa (patch) | |
tree | 70eeb1abeaa1fd22dbfd8ed5f07409f643cc6a9d /searchcore | |
parent | 4ed0d2b6f00c82bc3ba8a3290576a7bedfdd1895 (diff) |
- Create the common transport and threadpool in the main loop.
- Also use the common transport for config subscriptions.
- Put The TransportConfig in the fnet namespace.
Diffstat (limited to 'searchcore')
-rw-r--r-- | searchcore/src/apps/proton/proton.cpp | 15 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/server/proton.cpp | 25 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/server/proton.h | 11 |
3 files changed, 29 insertions, 22 deletions
diff --git a/searchcore/src/apps/proton/proton.cpp b/searchcore/src/apps/proton/proton.cpp index 61536f924a5..552e6688532 100644 --- a/searchcore/src/apps/proton/proton.cpp +++ b/searchcore/src/apps/proton/proton.cpp @@ -5,9 +5,13 @@ #include <vespa/metrics/metricmanager.h> #include <vespa/vespalib/util/signalhandler.h> #include <vespa/vespalib/util/programoptions.h> +#include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/config/common/exceptions.h> +#include <vespa/config/common/configcontext.h> +#include <vespa/fnet/transport.h> +#include <vespa/fastos/thread.h> #include <vespa/fastos/app.h> #include <iostream> #include <thread> @@ -186,8 +190,14 @@ App::Main() LOG(debug, "serviceidentity: '%s'", params.serviceidentity.c_str()); LOG(debug, "subscribeTimeout: '%" PRIu64 "'", params.subscribeTimeout); std::chrono::milliseconds subscribeTimeout(params.subscribeTimeout); - config::ConfigUri identityUri(params.identity); - protonUP = std::make_unique<proton::Proton>(identityUri, _argc > 0 ? _argv[0] : "proton", subscribeTimeout); + FastOS_ThreadPool threadPool(128_Ki); + uint32_t numProcs = std::thread::hardware_concurrency(); + FNET_Transport transport(fnet::TransportConfig(std::max(1u, std::min(4u, numProcs/8)))); + transport.Start(&threadPool); + config::ConfigServerSpec configServerSpec(transport); + config::ConfigUri identityUri(params.identity, std::make_shared<config::ConfigContext>(configServerSpec)); + protonUP = std::make_unique<proton::Proton>(threadPool, transport, identityUri, + _argc > 0 ? _argv[0] : "proton", subscribeTimeout); proton::Proton & proton = *protonUP; proton::BootstrapConfig::SP configSnapshot = proton.init(); if (proton.hasAbortedInit()) { @@ -228,6 +238,7 @@ App::Main() EV_STOPPING("servicelayer", "clean shutdown"); } protonUP.reset(); + transport.ShutDown(true); EV_STOPPING("proton", "clean shutdown"); } } catch (const vespalib::InvalidCommandLineArgumentsException &e) { diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 0f20b0a7b47..8bd965497a8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -201,9 +201,8 @@ Proton::ProtonFileHeaderContext::setClusterName(const vespalib::string & cluster } -Proton::Proton(const config::ConfigUri & configUri, - const vespalib::string &progName, - vespalib::duration subscribeTimeout) +Proton::Proton(FastOS_ThreadPool & threadPool, FNET_Transport & transport, const config::ConfigUri & configUri, + const vespalib::string &progName, vespalib::duration subscribeTimeout) : IProtonConfigurerOwner(), search::engine::MonitorServer(), IDocumentDBOwner(), @@ -211,8 +210,8 @@ Proton::Proton(const config::ConfigUri & configUri, IPersistenceEngineOwner(), ComponentConfigProducer(), _cpu_util(), - _threadPool(std::make_unique<FastOS_ThreadPool>(128_Ki)), - _transport(std::make_unique<FNET_Transport>(TransportConfig(1))), + _threadPool(threadPool), + _transport(transport), _configUri(configUri), _mutex(), _metricsHook(std::make_unique<MetricsUpdateHook>(*this)), @@ -238,7 +237,7 @@ Proton::Proton(const config::ConfigUri & configUri, _executor(1, 128_Ki), _protonDiskLayout(), _protonConfigurer(_executor, *this, _protonDiskLayout), - _protonConfigFetcher(*_transport, configUri, _protonConfigurer, subscribeTimeout), + _protonConfigFetcher(_transport, configUri, _protonConfigurer, subscribeTimeout), _shared_service(), _compile_cache_executor_binding(), _queryLimiter(), @@ -260,11 +259,10 @@ Proton::init() { assert( ! _initStarted && ! _initComplete ); _initStarted = true; - _transport->Start(_threadPool.get()); - if (_threadPool->NewThread(_clock.getRunnable(), nullptr) == nullptr) { + if (_threadPool.NewThread(_clock.getRunnable(), nullptr) == nullptr) { throw IllegalStateException("Failed starting thread for the cheap clock"); } - _protonConfigFetcher.start(*_threadPool); + _protonConfigFetcher.start(_threadPool); auto configSnapshot = _protonConfigurer.getPendingConfigSnapshot(); assert(configSnapshot); auto bootstrapConfig = configSnapshot->getBootstrapConfig(); @@ -282,7 +280,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) setBucketCheckSumType(protonConfig); setFS4Compression(protonConfig); - _shared_service = std::make_unique<SharedThreadingService>(SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu()), *_transport); + _shared_service = std::make_unique<SharedThreadingService>(SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu()), _transport); _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>(_shared_service->transport(), protonConfig.basedir, diskMemUsageSamplerConfig(protonConfig, hwInfo)); @@ -314,10 +312,10 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) strategy = std::make_shared<SimpleFlush>(); break; } - _protonDiskLayout = std::make_unique<ProtonDiskLayout>(*_transport, protonConfig.basedir, protonConfig.tlsspec); + _protonDiskLayout = std::make_unique<ProtonDiskLayout>(_transport, protonConfig.basedir, protonConfig.tlsspec); vespalib::chdir(protonConfig.basedir); vespalib::alloc::MmapFileAllocatorFactory::instance().setup(protonConfig.basedir + "/swapdirs"); - _tls->start(*_transport, hwInfo.cpu().cores()); + _tls->start(_transport, hwInfo.cpu().cores()); _flushEngine = std::make_unique<FlushEngine>(std::make_shared<flushengine::TlsStatsFactory>(_tls->getTransLogServer()), strategy, flush.maxconcurrent, vespalib::from_s(flush.idleinterval)); _metricsEngine->addExternalMetrics(_summaryEngine->getMetrics()); @@ -479,7 +477,6 @@ Proton::~Proton() _compile_cache_executor_binding.reset(); _shared_service.reset(); _clock.stop(); - _transport->ShutDown(true); LOG(debug, "Explicit destructor done"); } @@ -602,7 +599,7 @@ Proton::addDocumentDB(const document::DocumentType &docType, vespalib::string db_dir = config.basedir + "/documents/" + docTypeName.toString(); vespalib::mkdir(db_dir, false); // Assume parent is created. - auto config_store = std::make_unique<FileConfigManager>(*_transport, db_dir + "/config", + auto config_store = std::make_unique<FileConfigManager>(_transport, db_dir + "/config", documentDBConfig->getConfigId(), docTypeName.getName()); config_store->setProtonConfig(bootstrapConfig->getProtonConfigSP()); if (!initializeThreads) { diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h index 0490b1e00b7..73b8ae83ef2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton.h @@ -82,8 +82,8 @@ private: }; vespalib::CpuUtil _cpu_util; - std::unique_ptr<FastOS_ThreadPool> _threadPool; - std::unique_ptr<FNET_Transport> _transport; + FastOS_ThreadPool & _threadPool; + FNET_Transport & _transport; const config::ConfigUri _configUri; mutable std::shared_mutex _mutex; std::unique_ptr<metrics::UpdateHook> _metricsHook; @@ -145,9 +145,8 @@ public: typedef std::unique_ptr<Proton> UP; typedef std::shared_ptr<Proton> SP; - Proton(const config::ConfigUri & configUri, - const vespalib::string &progName, - vespalib::duration subscribeTimeout); + Proton(FastOS_ThreadPool & threadPool, FNET_Transport & transport, const config::ConfigUri & configUri, + const vespalib::string &progName, vespalib::duration subscribeTimeout); ~Proton() override; /** @@ -183,7 +182,7 @@ public: const std::shared_ptr<DocumentDBConfig> &documentDBConfig, InitializeThreads initializeThreads); metrics::MetricManager & getMetricManager(); - FastOS_ThreadPool & getThreadPool() { return *_threadPool; } + FastOS_ThreadPool & getThreadPool() { return _threadPool; } bool triggerFlush(); bool prepareRestart(); |