summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-02-24 12:50:31 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2022-02-24 12:50:31 +0000
commit9d8de3b985984987a4eae67e5d6d3a3fbdfc7efa (patch)
tree70eeb1abeaa1fd22dbfd8ed5f07409f643cc6a9d /searchcore
parent4ed0d2b6f00c82bc3ba8a3290576a7bedfdd1895 (diff)
- Create the common transport and threadpool in the main loop.
- Also use the common transport for config subscriptions. - Put The TransportConfig in the fnet namespace.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/apps/proton/proton.cpp15
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp25
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.h11
3 files changed, 29 insertions, 22 deletions
diff --git a/searchcore/src/apps/proton/proton.cpp b/searchcore/src/apps/proton/proton.cpp
index 61536f924a5..552e6688532 100644
--- a/searchcore/src/apps/proton/proton.cpp
+++ b/searchcore/src/apps/proton/proton.cpp
@@ -5,9 +5,13 @@
#include <vespa/metrics/metricmanager.h>
#include <vespa/vespalib/util/signalhandler.h>
#include <vespa/vespalib/util/programoptions.h>
+#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/config/common/exceptions.h>
+#include <vespa/config/common/configcontext.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/fastos/thread.h>
#include <vespa/fastos/app.h>
#include <iostream>
#include <thread>
@@ -186,8 +190,14 @@ App::Main()
LOG(debug, "serviceidentity: '%s'", params.serviceidentity.c_str());
LOG(debug, "subscribeTimeout: '%" PRIu64 "'", params.subscribeTimeout);
std::chrono::milliseconds subscribeTimeout(params.subscribeTimeout);
- config::ConfigUri identityUri(params.identity);
- protonUP = std::make_unique<proton::Proton>(identityUri, _argc > 0 ? _argv[0] : "proton", subscribeTimeout);
+ FastOS_ThreadPool threadPool(128_Ki);
+ uint32_t numProcs = std::thread::hardware_concurrency();
+ FNET_Transport transport(fnet::TransportConfig(std::max(1u, std::min(4u, numProcs/8))));
+ transport.Start(&threadPool);
+ config::ConfigServerSpec configServerSpec(transport);
+ config::ConfigUri identityUri(params.identity, std::make_shared<config::ConfigContext>(configServerSpec));
+ protonUP = std::make_unique<proton::Proton>(threadPool, transport, identityUri,
+ _argc > 0 ? _argv[0] : "proton", subscribeTimeout);
proton::Proton & proton = *protonUP;
proton::BootstrapConfig::SP configSnapshot = proton.init();
if (proton.hasAbortedInit()) {
@@ -228,6 +238,7 @@ App::Main()
EV_STOPPING("servicelayer", "clean shutdown");
}
protonUP.reset();
+ transport.ShutDown(true);
EV_STOPPING("proton", "clean shutdown");
}
} catch (const vespalib::InvalidCommandLineArgumentsException &e) {
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 0f20b0a7b47..8bd965497a8 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -201,9 +201,8 @@ Proton::ProtonFileHeaderContext::setClusterName(const vespalib::string & cluster
}
-Proton::Proton(const config::ConfigUri & configUri,
- const vespalib::string &progName,
- vespalib::duration subscribeTimeout)
+Proton::Proton(FastOS_ThreadPool & threadPool, FNET_Transport & transport, const config::ConfigUri & configUri,
+ const vespalib::string &progName, vespalib::duration subscribeTimeout)
: IProtonConfigurerOwner(),
search::engine::MonitorServer(),
IDocumentDBOwner(),
@@ -211,8 +210,8 @@ Proton::Proton(const config::ConfigUri & configUri,
IPersistenceEngineOwner(),
ComponentConfigProducer(),
_cpu_util(),
- _threadPool(std::make_unique<FastOS_ThreadPool>(128_Ki)),
- _transport(std::make_unique<FNET_Transport>(TransportConfig(1))),
+ _threadPool(threadPool),
+ _transport(transport),
_configUri(configUri),
_mutex(),
_metricsHook(std::make_unique<MetricsUpdateHook>(*this)),
@@ -238,7 +237,7 @@ Proton::Proton(const config::ConfigUri & configUri,
_executor(1, 128_Ki),
_protonDiskLayout(),
_protonConfigurer(_executor, *this, _protonDiskLayout),
- _protonConfigFetcher(*_transport, configUri, _protonConfigurer, subscribeTimeout),
+ _protonConfigFetcher(_transport, configUri, _protonConfigurer, subscribeTimeout),
_shared_service(),
_compile_cache_executor_binding(),
_queryLimiter(),
@@ -260,11 +259,10 @@ Proton::init()
{
assert( ! _initStarted && ! _initComplete );
_initStarted = true;
- _transport->Start(_threadPool.get());
- if (_threadPool->NewThread(_clock.getRunnable(), nullptr) == nullptr) {
+ if (_threadPool.NewThread(_clock.getRunnable(), nullptr) == nullptr) {
throw IllegalStateException("Failed starting thread for the cheap clock");
}
- _protonConfigFetcher.start(*_threadPool);
+ _protonConfigFetcher.start(_threadPool);
auto configSnapshot = _protonConfigurer.getPendingConfigSnapshot();
assert(configSnapshot);
auto bootstrapConfig = configSnapshot->getBootstrapConfig();
@@ -282,7 +280,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
setBucketCheckSumType(protonConfig);
setFS4Compression(protonConfig);
- _shared_service = std::make_unique<SharedThreadingService>(SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu()), *_transport);
+ _shared_service = std::make_unique<SharedThreadingService>(SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu()), _transport);
_diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>(_shared_service->transport(), protonConfig.basedir,
diskMemUsageSamplerConfig(protonConfig, hwInfo));
@@ -314,10 +312,10 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
strategy = std::make_shared<SimpleFlush>();
break;
}
- _protonDiskLayout = std::make_unique<ProtonDiskLayout>(*_transport, protonConfig.basedir, protonConfig.tlsspec);
+ _protonDiskLayout = std::make_unique<ProtonDiskLayout>(_transport, protonConfig.basedir, protonConfig.tlsspec);
vespalib::chdir(protonConfig.basedir);
vespalib::alloc::MmapFileAllocatorFactory::instance().setup(protonConfig.basedir + "/swapdirs");
- _tls->start(*_transport, hwInfo.cpu().cores());
+ _tls->start(_transport, hwInfo.cpu().cores());
_flushEngine = std::make_unique<FlushEngine>(std::make_shared<flushengine::TlsStatsFactory>(_tls->getTransLogServer()),
strategy, flush.maxconcurrent, vespalib::from_s(flush.idleinterval));
_metricsEngine->addExternalMetrics(_summaryEngine->getMetrics());
@@ -479,7 +477,6 @@ Proton::~Proton()
_compile_cache_executor_binding.reset();
_shared_service.reset();
_clock.stop();
- _transport->ShutDown(true);
LOG(debug, "Explicit destructor done");
}
@@ -602,7 +599,7 @@ Proton::addDocumentDB(const document::DocumentType &docType,
vespalib::string db_dir = config.basedir + "/documents/" + docTypeName.toString();
vespalib::mkdir(db_dir, false); // Assume parent is created.
- auto config_store = std::make_unique<FileConfigManager>(*_transport, db_dir + "/config",
+ auto config_store = std::make_unique<FileConfigManager>(_transport, db_dir + "/config",
documentDBConfig->getConfigId(), docTypeName.getName());
config_store->setProtonConfig(bootstrapConfig->getProtonConfigSP());
if (!initializeThreads) {
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h
index 0490b1e00b7..73b8ae83ef2 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.h
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.h
@@ -82,8 +82,8 @@ private:
};
vespalib::CpuUtil _cpu_util;
- std::unique_ptr<FastOS_ThreadPool> _threadPool;
- std::unique_ptr<FNET_Transport> _transport;
+ FastOS_ThreadPool & _threadPool;
+ FNET_Transport & _transport;
const config::ConfigUri _configUri;
mutable std::shared_mutex _mutex;
std::unique_ptr<metrics::UpdateHook> _metricsHook;
@@ -145,9 +145,8 @@ public:
typedef std::unique_ptr<Proton> UP;
typedef std::shared_ptr<Proton> SP;
- Proton(const config::ConfigUri & configUri,
- const vespalib::string &progName,
- vespalib::duration subscribeTimeout);
+ Proton(FastOS_ThreadPool & threadPool, FNET_Transport & transport, const config::ConfigUri & configUri,
+ const vespalib::string &progName, vespalib::duration subscribeTimeout);
~Proton() override;
/**
@@ -183,7 +182,7 @@ public:
const std::shared_ptr<DocumentDBConfig> &documentDBConfig, InitializeThreads initializeThreads);
metrics::MetricManager & getMetricManager();
- FastOS_ThreadPool & getThreadPool() { return *_threadPool; }
+ FastOS_ThreadPool & getThreadPool() { return _threadPool; }
bool triggerFlush();
bool prepareRestart();