From cceca8e48b0c98a0faa93eee742f43a974d0afc3 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Thu, 24 Feb 2022 20:31:12 +0100 Subject: Revert "- Create the common transport and threadpool in the main loop." --- .../src/vespa/config/subscription/sourcespec.cpp | 16 ----------- config/src/vespa/config/subscription/sourcespec.h | 31 ++++++++-------------- fnet/src/tests/connect/connect_test.cpp | 4 +-- .../tests/frt/parallel_rpc/parallel_rpc_test.cpp | 2 +- .../transport_debugger/transport_debugger_test.cpp | 2 +- fnet/src/vespa/fnet/frt/supervisor.h | 2 +- fnet/src/vespa/fnet/transport.cpp | 6 ++--- fnet/src/vespa/fnet/transport.h | 11 ++++---- .../src/vespa/messagebus/network/rpcnetwork.cpp | 4 +-- searchcore/src/apps/proton/proton.cpp | 21 ++------------- .../src/vespa/searchcore/proton/server/proton.cpp | 25 +++++++++-------- .../src/vespa/searchcore/proton/server/proton.h | 11 ++++---- .../rpc_mapping_monitor_test.cpp | 9 +++---- slobrok/src/vespa/slobrok/server/sbenv.cpp | 2 +- .../storageserver/rpc/shared_rpc_resources.cpp | 2 +- 15 files changed, 54 insertions(+), 94 deletions(-) diff --git a/config/src/vespa/config/subscription/sourcespec.cpp b/config/src/vespa/config/subscription/sourcespec.cpp index 15f45434056..ec70f921179 100644 --- a/config/src/vespa/config/subscription/sourcespec.cpp +++ b/config/src/vespa/config/subscription/sourcespec.cpp @@ -131,22 +131,6 @@ ServerSpec::createSourceFactory(const TimingValues & timingValues) const timingValues, _traceLevel, vespaVersion, _compressionType); } -ConfigServerSpec::ConfigServerSpec(FNET_Transport & transport) - : ServerSpec(), - _transport(transport) -{ -} - -ConfigServerSpec::~ConfigServerSpec() = default; - -std::unique_ptr -ConfigServerSpec::createSourceFactory(const TimingValues & timingValues) const -{ - const auto vespaVersion = VespaVersion::getCurrentVersion(); - return std::make_unique( - std::make_unique(_transport, *this, timingValues), - timingValues, traceLevel(), vespaVersion, compressionType()); -} ConfigSet::ConfigSet() : _builderMap(std::make_unique()) diff --git a/config/src/vespa/config/subscription/sourcespec.h b/config/src/vespa/config/subscription/sourcespec.h index e2bdd1c61ee..3c23233e3ab 100644 --- a/config/src/vespa/config/subscription/sourcespec.h +++ b/config/src/vespa/config/subscription/sourcespec.h @@ -7,8 +7,6 @@ #include #include -class FNET_Transport; - namespace config { class ConfigInstance; @@ -24,6 +22,7 @@ typedef vespalib::string SourceSpecKey; class SourceSpec { public: + using SourceFactorySP = std::unique_ptr; using UP = std::unique_ptr; /// Convenience typedef /** @@ -37,7 +36,7 @@ public: * @param timingValues Timing values to be used for this source. * @return An std::unique_ptr that can be used to ask for config. */ - virtual std::unique_ptr createSourceFactory(const TimingValues & timingValues) const = 0; + virtual SourceFactorySP createSourceFactory(const TimingValues & timingValues) const = 0; virtual ~SourceSpec() = default; }; @@ -55,7 +54,8 @@ public: */ RawSpec(const vespalib::string & config); - std::unique_ptr createSourceFactory(const TimingValues & timingValues) const override; + // Implements SourceSpec + SourceFactorySP createSourceFactory(const TimingValues & timingValues) const override; /** * Returns the string representation of this config. @@ -89,7 +89,8 @@ public: */ const vespalib::string & getFileName() const { return _fileName; } - std::unique_ptr createSourceFactory(const TimingValues & timingValues) const override; + // Implements SourceSpec + SourceFactorySP createSourceFactory(const TimingValues & timingValues) const override; private: void verifyName(const vespalib::string & fileName); vespalib::string _fileName; @@ -116,7 +117,8 @@ public: */ const vespalib::string & getDirName() const { return _dirName; } - std::unique_ptr createSourceFactory(const TimingValues & timingValues) const override; + // Implements SourceSpec + SourceFactorySP createSourceFactory(const TimingValues & timingValues) const override; private: vespalib::string _dirName; }; @@ -152,7 +154,7 @@ public: */ ServerSpec(const vespalib::string & hostSpec); - std::unique_ptr createSourceFactory(const TimingValues & timingValues) const override; + SourceFactorySP createSourceFactory(const TimingValues & timingValues) const override; /** * Inspect how many hosts this source refers to. @@ -191,18 +193,6 @@ private: const static int DEFAULT_PROXY_PORT = 19090; }; -/** - * A ServerSpec that allows providing externally supplied transport - */ -class ConfigServerSpec : public config::ServerSpec { -public: - ConfigServerSpec(FNET_Transport & transport); - ~ConfigServerSpec() override; - std::unique_ptr createSourceFactory(const TimingValues & timingValues) const override; -private: - FNET_Transport & _transport; -}; - /** @@ -231,7 +221,8 @@ public: */ void addBuilder(const vespalib::string & configId, ConfigInstance * builder); - std::unique_ptr createSourceFactory(const TimingValues & timingValues) const override; + // Implements SourceSpec + SourceFactorySP createSourceFactory(const TimingValues & timingValues) const override; private: BuilderMapSP _builderMap; }; diff --git a/fnet/src/tests/connect/connect_test.cpp b/fnet/src/tests/connect/connect_test.cpp index 2edb2e694ff..308981542db 100644 --- a/fnet/src/tests/connect/connect_test.cpp +++ b/fnet/src/tests/connect/connect_test.cpp @@ -98,13 +98,13 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler { transport.Start(&pool); } TransportFixture(AsyncResolver::HostResolver::SP host_resolver) - : streamer(nullptr), pool(128_Ki), transport(fnet::TransportConfig().resolver(make_resolver(std::move(host_resolver)))), + : streamer(nullptr), pool(128_Ki), transport(TransportConfig().resolver(make_resolver(std::move(host_resolver)))), conn_lost(), conn_deleted() { transport.Start(&pool); } TransportFixture(CryptoEngine::SP crypto) - : streamer(nullptr), pool(128_Ki), transport(fnet::TransportConfig().crypto(std::move(crypto))), + : streamer(nullptr), pool(128_Ki), transport(TransportConfig().crypto(std::move(crypto))), conn_lost(), conn_deleted() { transport.Start(&pool); diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp index b027b80f8ab..0fd5029c9b9 100644 --- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp +++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp @@ -19,7 +19,7 @@ struct Rpc : FRT_Invokable { FNET_Transport transport; FRT_Supervisor orb; Rpc(CryptoEngine::SP crypto, size_t num_threads, bool drop_empty) - : thread_pool(128_Ki), transport(fnet::TransportConfig(num_threads).crypto(std::move(crypto)).drop_empty_buffers(drop_empty)), orb(&transport) {} + : thread_pool(128_Ki), transport(TransportConfig(num_threads).crypto(std::move(crypto)).drop_empty_buffers(drop_empty)), orb(&transport) {} void start() { ASSERT_TRUE(transport.Start(&thread_pool)); } diff --git a/fnet/src/tests/transport_debugger/transport_debugger_test.cpp b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp index a363b1df4c2..044f9da5394 100644 --- a/fnet/src/tests/transport_debugger/transport_debugger_test.cpp +++ b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp @@ -18,7 +18,7 @@ vespalib::CryptoEngine::SP tls_crypto = std::make_shared #include -namespace fnet { class TransportConfig; } +class TransportConfig; class FNET_Transport; class FRT_Target; class FastOS_ThreadPool; diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp index b4e3443b886..a695b95bbac 100644 --- a/fnet/src/vespa/fnet/transport.cpp +++ b/fnet/src/vespa/fnet/transport.cpp @@ -98,6 +98,8 @@ TimeTools::make_debug(vespalib::duration event_timeout, return std::make_shared(event_timeout, std::move(current_time)); } +} // fnet + TransportConfig::TransportConfig(int num_threads) : _config(), _resolver(), @@ -123,9 +125,7 @@ TransportConfig::time_tools() const { return _time_tools ? _time_tools : std::make_shared(); } -} // fnet - -FNET_Transport::FNET_Transport(const fnet::TransportConfig &cfg) +FNET_Transport::FNET_Transport(const TransportConfig &cfg) : _async_resolver(cfg.resolver()), _crypto_engine(cfg.crypto()), _time_tools(cfg.time_tools()), diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h index c3101d37b18..1e8faad9c1d 100644 --- a/fnet/src/vespa/fnet/transport.h +++ b/fnet/src/vespa/fnet/transport.h @@ -4,6 +4,8 @@ #include "context.h" #include "config.h" +#include +#include #include #include #include @@ -37,6 +39,7 @@ struct TimeTools { std::function current_time); }; +} // fnet class TransportConfig { public: @@ -93,8 +96,6 @@ private: uint32_t _num_threads; }; -} // fnet - /** * This class represents the transport layer and handles a collection * of transport threads. Note: remember to shut down your transport @@ -123,12 +124,12 @@ public: * the current thread become the transport thread. Main may only * be called for single-threaded transports. **/ - explicit FNET_Transport(const fnet::TransportConfig &config); + explicit FNET_Transport(const TransportConfig &config); explicit FNET_Transport(uint32_t num_threads) - : FNET_Transport(fnet::TransportConfig(num_threads)) {} + : FNET_Transport(TransportConfig(num_threads)) {} FNET_Transport() - : FNET_Transport(fnet::TransportConfig()) {} + : FNET_Transport(TransportConfig()) {} ~FNET_Transport(); const FNET_Config & getConfig() const { return _config; } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 0f7ebeb9a36..c33f918a39c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -80,9 +80,9 @@ struct TargetPoolTask : public FNET_Task { } }; -fnet::TransportConfig +TransportConfig toFNETConfig(const RPCNetworkParams & params) { - return fnet::TransportConfig(params.getNumNetworkThreads()) + return TransportConfig(params.getNumNetworkThreads()) .maxInputBufferSize(params.getMaxInputBufferSize()) .maxOutputBufferSize(params.getMaxOutputBufferSize()) .tcpNoDelay(params.getTcpNoDelay()); diff --git a/searchcore/src/apps/proton/proton.cpp b/searchcore/src/apps/proton/proton.cpp index 3a31f941506..61536f924a5 100644 --- a/searchcore/src/apps/proton/proton.cpp +++ b/searchcore/src/apps/proton/proton.cpp @@ -5,13 +5,9 @@ #include #include #include -#include #include #include #include -#include -#include -#include #include #include #include @@ -177,12 +173,6 @@ ExitOnSignal::operator()() } } -fnet::TransportConfig -buildTransportConfig() { - uint32_t numProcs = std::thread::hardware_concurrency(); - return fnet::TransportConfig(std::max(1u, std::min(4u, numProcs/8))); -} - } int @@ -196,14 +186,8 @@ App::Main() LOG(debug, "serviceidentity: '%s'", params.serviceidentity.c_str()); LOG(debug, "subscribeTimeout: '%" PRIu64 "'", params.subscribeTimeout); std::chrono::milliseconds subscribeTimeout(params.subscribeTimeout); - FastOS_ThreadPool threadPool(128_Ki); - - FNET_Transport transport(buildTransportConfig()); - transport.Start(&threadPool); - config::ConfigServerSpec configServerSpec(transport); - config::ConfigUri identityUri(params.identity, std::make_shared(configServerSpec)); - protonUP = std::make_unique(threadPool, transport, identityUri, - _argc > 0 ? _argv[0] : "proton", subscribeTimeout); + config::ConfigUri identityUri(params.identity); + protonUP = std::make_unique(identityUri, _argc > 0 ? _argv[0] : "proton", subscribeTimeout); proton::Proton & proton = *protonUP; proton::BootstrapConfig::SP configSnapshot = proton.init(); if (proton.hasAbortedInit()) { @@ -244,7 +228,6 @@ App::Main() EV_STOPPING("servicelayer", "clean shutdown"); } protonUP.reset(); - transport.ShutDown(true); EV_STOPPING("proton", "clean shutdown"); } } catch (const vespalib::InvalidCommandLineArgumentsException &e) { diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 8bd965497a8..0f20b0a7b47 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -201,8 +201,9 @@ Proton::ProtonFileHeaderContext::setClusterName(const vespalib::string & cluster } -Proton::Proton(FastOS_ThreadPool & threadPool, FNET_Transport & transport, const config::ConfigUri & configUri, - const vespalib::string &progName, vespalib::duration subscribeTimeout) +Proton::Proton(const config::ConfigUri & configUri, + const vespalib::string &progName, + vespalib::duration subscribeTimeout) : IProtonConfigurerOwner(), search::engine::MonitorServer(), IDocumentDBOwner(), @@ -210,8 +211,8 @@ Proton::Proton(FastOS_ThreadPool & threadPool, FNET_Transport & transport, const IPersistenceEngineOwner(), ComponentConfigProducer(), _cpu_util(), - _threadPool(threadPool), - _transport(transport), + _threadPool(std::make_unique(128_Ki)), + _transport(std::make_unique(TransportConfig(1))), _configUri(configUri), _mutex(), _metricsHook(std::make_unique(*this)), @@ -237,7 +238,7 @@ Proton::Proton(FastOS_ThreadPool & threadPool, FNET_Transport & transport, const _executor(1, 128_Ki), _protonDiskLayout(), _protonConfigurer(_executor, *this, _protonDiskLayout), - _protonConfigFetcher(_transport, configUri, _protonConfigurer, subscribeTimeout), + _protonConfigFetcher(*_transport, configUri, _protonConfigurer, subscribeTimeout), _shared_service(), _compile_cache_executor_binding(), _queryLimiter(), @@ -259,10 +260,11 @@ Proton::init() { assert( ! _initStarted && ! _initComplete ); _initStarted = true; - if (_threadPool.NewThread(_clock.getRunnable(), nullptr) == nullptr) { + _transport->Start(_threadPool.get()); + if (_threadPool->NewThread(_clock.getRunnable(), nullptr) == nullptr) { throw IllegalStateException("Failed starting thread for the cheap clock"); } - _protonConfigFetcher.start(_threadPool); + _protonConfigFetcher.start(*_threadPool); auto configSnapshot = _protonConfigurer.getPendingConfigSnapshot(); assert(configSnapshot); auto bootstrapConfig = configSnapshot->getBootstrapConfig(); @@ -280,7 +282,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) setBucketCheckSumType(protonConfig); setFS4Compression(protonConfig); - _shared_service = std::make_unique(SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu()), _transport); + _shared_service = std::make_unique(SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu()), *_transport); _diskMemUsageSampler = std::make_unique(_shared_service->transport(), protonConfig.basedir, diskMemUsageSamplerConfig(protonConfig, hwInfo)); @@ -312,10 +314,10 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) strategy = std::make_shared(); break; } - _protonDiskLayout = std::make_unique(_transport, protonConfig.basedir, protonConfig.tlsspec); + _protonDiskLayout = std::make_unique(*_transport, protonConfig.basedir, protonConfig.tlsspec); vespalib::chdir(protonConfig.basedir); vespalib::alloc::MmapFileAllocatorFactory::instance().setup(protonConfig.basedir + "/swapdirs"); - _tls->start(_transport, hwInfo.cpu().cores()); + _tls->start(*_transport, hwInfo.cpu().cores()); _flushEngine = std::make_unique(std::make_shared(_tls->getTransLogServer()), strategy, flush.maxconcurrent, vespalib::from_s(flush.idleinterval)); _metricsEngine->addExternalMetrics(_summaryEngine->getMetrics()); @@ -477,6 +479,7 @@ Proton::~Proton() _compile_cache_executor_binding.reset(); _shared_service.reset(); _clock.stop(); + _transport->ShutDown(true); LOG(debug, "Explicit destructor done"); } @@ -599,7 +602,7 @@ Proton::addDocumentDB(const document::DocumentType &docType, vespalib::string db_dir = config.basedir + "/documents/" + docTypeName.toString(); vespalib::mkdir(db_dir, false); // Assume parent is created. - auto config_store = std::make_unique(_transport, db_dir + "/config", + auto config_store = std::make_unique(*_transport, db_dir + "/config", documentDBConfig->getConfigId(), docTypeName.getName()); config_store->setProtonConfig(bootstrapConfig->getProtonConfigSP()); if (!initializeThreads) { diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h index 73b8ae83ef2..0490b1e00b7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton.h @@ -82,8 +82,8 @@ private: }; vespalib::CpuUtil _cpu_util; - FastOS_ThreadPool & _threadPool; - FNET_Transport & _transport; + std::unique_ptr _threadPool; + std::unique_ptr _transport; const config::ConfigUri _configUri; mutable std::shared_mutex _mutex; std::unique_ptr _metricsHook; @@ -145,8 +145,9 @@ public: typedef std::unique_ptr UP; typedef std::shared_ptr SP; - Proton(FastOS_ThreadPool & threadPool, FNET_Transport & transport, const config::ConfigUri & configUri, - const vespalib::string &progName, vespalib::duration subscribeTimeout); + Proton(const config::ConfigUri & configUri, + const vespalib::string &progName, + vespalib::duration subscribeTimeout); ~Proton() override; /** @@ -182,7 +183,7 @@ public: const std::shared_ptr &documentDBConfig, InitializeThreads initializeThreads); metrics::MetricManager & getMetricManager(); - FastOS_ThreadPool & getThreadPool() { return _threadPool; } + FastOS_ThreadPool & getThreadPool() { return *_threadPool; } bool triggerFlush(); bool prepareRestart(); diff --git a/slobrok/src/tests/rpc_mapping_monitor/rpc_mapping_monitor_test.cpp b/slobrok/src/tests/rpc_mapping_monitor/rpc_mapping_monitor_test.cpp index 13db95eb35c..0f691e1c168 100644 --- a/slobrok/src/tests/rpc_mapping_monitor/rpc_mapping_monitor_test.cpp +++ b/slobrok/src/tests/rpc_mapping_monitor/rpc_mapping_monitor_test.cpp @@ -28,11 +28,8 @@ struct Server : FRT_Invokable { last_conn->AddRef(); } } - Server(fnet::TimeTools::SP time_tools) - : frt(fnet::TransportConfig().time_tools(time_tools)), - names(), - inject_fail_cnt(0), - last_conn(nullptr) + Server(fnet::TimeTools::SP time_tools) : frt(TransportConfig().time_tools(time_tools)), names(), + inject_fail_cnt(0), last_conn(nullptr) { FRT_ReflectionBuilder rb(&frt.supervisor()); rb.DefineMethod("slobrok.callback.listNamesServed", "", "S", FRT_METHOD(Server::rpc_listNamesServed), this); @@ -106,7 +103,7 @@ struct RpcMappingMonitorTest : public ::testing::Test { ServiceMapping baz_b; RpcMappingMonitorTest() : debugger(), - my_frt(fnet::TransportConfig().time_tools(debugger.time_tools())), + my_frt(TransportConfig().time_tools(debugger.time_tools())), a(debugger.time_tools()), b(debugger.time_tools()), hist(), diff --git a/slobrok/src/vespa/slobrok/server/sbenv.cpp b/slobrok/src/vespa/slobrok/server/sbenv.cpp index de9e5a0025f..84bf212abb3 100644 --- a/slobrok/src/vespa/slobrok/server/sbenv.cpp +++ b/slobrok/src/vespa/slobrok/server/sbenv.cpp @@ -98,7 +98,7 @@ ConfigTask::PerformTask() } // namespace slobrok:: SBEnv::SBEnv(const ConfigShim &shim) - : _transport(std::make_unique(fnet::TransportConfig().drop_empty_buffers(true))), + : _transport(std::make_unique(TransportConfig().drop_empty_buffers(true))), _supervisor(std::make_unique(_transport.get())), _configShim(shim), _configurator(shim.factory().create(*this)), diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp index e1a2dc6b03c..ba3196bfcda 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp @@ -65,7 +65,7 @@ SharedRpcResources::SharedRpcResources(const config::ConfigUri& config_uri, size_t rpc_thread_pool_size, size_t rpc_events_before_wakeup) : _thread_pool(std::make_unique(1024*60)), - _transport(std::make_unique(fnet::TransportConfig(rpc_thread_pool_size). + _transport(std::make_unique(TransportConfig(rpc_thread_pool_size). events_before_wakeup(rpc_events_before_wakeup))), _orb(std::make_unique(_transport.get())), _slobrok_register(std::make_unique(*_orb, slobrok::ConfiguratorFactory(config_uri))), -- cgit v1.2.3