diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-02-21 19:00:58 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-21 19:00:58 +0100 |
commit | 91d75605d62b4d0604cfb147f4b97863c9112157 (patch) | |
tree | 49b08e0b66fdec37bce82643b5c310db0cb41a7b | |
parent | a7e8bb9dcf3c674a3756e0f0383384593856415a (diff) | |
parent | 9d83cff2851b2126700b1b3cf7b4080ac7c8fb81 (diff) |
Merge pull request #21294 from vespa-engine/revert-21281-balder/use-common-transport
Revert "Use a common FNET_Transport owned by Proton in both SceduledExecutor …"
55 files changed, 282 insertions, 429 deletions
diff --git a/config/src/tests/file_acquirer/file_acquirer_test.cpp b/config/src/tests/file_acquirer/file_acquirer_test.cpp index 7daafc37fcf..f289e42edcb 100644 --- a/config/src/tests/file_acquirer/file_acquirer_test.cpp +++ b/config/src/tests/file_acquirer/file_acquirer_test.cpp @@ -3,16 +3,12 @@ #include <vespa/config/file_acquirer/file_acquirer.h> #include <vespa/fnet/frt/supervisor.h> #include <vespa/fnet/frt/rpcrequest.h> -#include <vespa/fnet/transport.h> -#include <vespa/fastos/thread.h> #include <vespa/vespalib/util/stringfmt.h> using namespace config; struct ServerFixture : FRT_Invokable { fnet::frt::StandaloneFRT server; - FastOS_ThreadPool threadPool; - FNET_Transport transport; FRT_Supervisor &orb; vespalib::string spec; @@ -24,16 +20,10 @@ struct ServerFixture : FRT_Invokable { rb.ReturnDesc("file_path", "actual path to the requested file"); } - ServerFixture() - : server(), - threadPool(64_Ki), - transport(), - orb(server.supervisor()) - { + ServerFixture() : server(), orb(server.supervisor()) { init_rpc(); orb.Listen(0); spec = vespalib::make_string("tcp/localhost:%d", orb.GetListenPort()); - transport.Start(&threadPool); } void RPC_waitFor(FRT_RPCRequest *req) { @@ -46,12 +36,10 @@ struct ServerFixture : FRT_Invokable { } } - ~ServerFixture() { - transport.ShutDown(true); - } + ~ServerFixture() = default; }; -TEST_FF("require that files can be acquired over rpc", ServerFixture(), RpcFileAcquirer(f1.transport, f1.spec)) { +TEST_FF("require that files can be acquired over rpc", ServerFixture(), RpcFileAcquirer(f1.spec)) { EXPECT_EQUAL("my_path", f2.wait_for("my_ref", 60.0)); EXPECT_EQUAL("", f2.wait_for("bogus_ref", 60.0)); } diff --git a/config/src/vespa/config/file_acquirer/file_acquirer.cpp b/config/src/vespa/config/file_acquirer/file_acquirer.cpp index 9cf280566e0..46782cac9ca 100644 --- a/config/src/vespa/config/file_acquirer/file_acquirer.cpp +++ b/config/src/vespa/config/file_acquirer/file_acquirer.cpp @@ -13,10 +13,14 @@ LOG_SETUP(".config.file_acquirer"); namespace config { -RpcFileAcquirer::RpcFileAcquirer(FNET_Transport & transport, const vespalib::string &spec) - : _orb(std::make_unique<FRT_Supervisor>(&transport)), +RpcFileAcquirer::RpcFileAcquirer(const vespalib::string &spec) + : _threadPool(std::make_unique<FastOS_ThreadPool>(60_Ki)), + _transport(std::make_unique<FNET_Transport>()), + _orb(std::make_unique<FRT_Supervisor>(_transport.get())), _spec(spec) -{ } +{ + _transport->Start(_threadPool.get()); +} vespalib::string RpcFileAcquirer::wait_for(const vespalib::string &file_ref, double timeout_s) @@ -38,6 +42,9 @@ RpcFileAcquirer::wait_for(const vespalib::string &file_ref, double timeout_s) return path; } -RpcFileAcquirer::~RpcFileAcquirer() = default; +RpcFileAcquirer::~RpcFileAcquirer() +{ + _transport->ShutDown(true); +} } // namespace config diff --git a/config/src/vespa/config/file_acquirer/file_acquirer.h b/config/src/vespa/config/file_acquirer/file_acquirer.h index a6d90fbee15..4088065281d 100644 --- a/config/src/vespa/config/file_acquirer/file_acquirer.h +++ b/config/src/vespa/config/file_acquirer/file_acquirer.h @@ -6,6 +6,7 @@ class FRT_Supervisor; class FNET_Transport; +class FastOS_ThreadPool; namespace config { @@ -25,10 +26,12 @@ struct FileAcquirer { class RpcFileAcquirer : public FileAcquirer { private: - std::unique_ptr<FRT_Supervisor> _orb; - vespalib::string _spec; + std::unique_ptr<FastOS_ThreadPool> _threadPool; + std::unique_ptr<FNET_Transport> _transport; + std::unique_ptr<FRT_Supervisor> _orb; + vespalib::string _spec; public: - RpcFileAcquirer(FNET_Transport & transport, const vespalib::string &spec); + RpcFileAcquirer(const vespalib::string &spec); vespalib::string wait_for(const vespalib::string &file_ref, double timeout_s) override; ~RpcFileAcquirer() override; }; diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp index a695b95bbac..8661c8ecace 100644 --- a/fnet/src/vespa/fnet/transport.cpp +++ b/fnet/src/vespa/fnet/transport.cpp @@ -6,12 +6,9 @@ #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/rendezvous.h> -#include <vespa/vespalib/util/backtrace.h> +#include <chrono> #include <xxhash.h> -#include <vespa/log/log.h> -LOG_SETUP(".fnet.transport"); - namespace { struct HashState { @@ -133,8 +130,6 @@ FNET_Transport::FNET_Transport(const TransportConfig &cfg) _threads(), _config(cfg.config()) { - // TODO Temporary logging to track down overspend - LOG(debug, "FNET_Transport threads=%d from :%s", cfg.num_threads(), vespalib::getStackTrace(0).c_str()); assert(cfg.num_threads() >= 1); for (size_t i = 0; i < cfg.num_threads(); ++i) { _threads.emplace_back(std::make_unique<FNET_TransportThread>(*this)); diff --git a/searchcore/src/apps/tests/CMakeLists.txt b/searchcore/src/apps/tests/CMakeLists.txt index 532c89d6ab7..12e19d40aca 100644 --- a/searchcore/src/apps/tests/CMakeLists.txt +++ b/searchcore/src/apps/tests/CMakeLists.txt @@ -3,7 +3,6 @@ vespa_add_executable(searchcore_persistenceconformance_test_app TEST SOURCES persistenceconformance_test.cpp DEPENDS - searchcore_test searchcore_server searchcore_initializer searchcore_reprocessing diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp index fd0ace21955..483cc3f2792 100644 --- a/searchcore/src/apps/tests/persistenceconformance_test.cpp +++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp @@ -169,14 +169,14 @@ class DocumentDBFactory : public DummyDBOwner { private: vespalib::string _baseDir; DummyFileHeaderContext _fileHeaderContext; + TransLogServer _tls; vespalib::string _tlsSpec; matching::QueryLimiter _queryLimiter; vespalib::Clock _clock; - mutable DummyWireService _metricsWireService; - mutable MemoryConfigStores _config_stores; + mutable DummyWireService _metricsWireService; + mutable MemoryConfigStores _config_stores; vespalib::ThreadStackExecutor _summaryExecutor; - MockSharedThreadingService _shared_service; - TransLogServer _tls; + MockSharedThreadingService _shared_service; storage::spi::dummy::DummyBucketExecutor _bucketExecutor; static std::shared_ptr<ProtonConfig> make_proton_config() { @@ -196,7 +196,7 @@ public: vespalib::mkdir(_baseDir + "/" + docType.toString(), false); vespalib::string inputCfg = _baseDir + "/" + docType.toString() + "/baseconfig"; { - FileConfigManager fileCfg(_shared_service.transport(), inputCfg, "", docType.getName()); + FileConfigManager fileCfg(inputCfg, "", docType.getName()); fileCfg.saveConfig(*snapshot, 1); } config::DirSpec spec(inputCfg + "/config-1"); @@ -208,7 +208,7 @@ public: std::make_shared<BucketspacesConfig>(), tuneFileDocDB, HwInfo()); mgr.forwardConfig(b); - mgr.nextGeneration(_shared_service.transport(), 0ms); + mgr.nextGeneration(0ms); return DocumentDB::create(_baseDir, mgr.getConfig(), _tlsSpec, _queryLimiter, _clock, docType, bucketSpace, *b->getProtonConfigSP(), const_cast<DocumentDBFactory &>(*this), _shared_service, _bucketExecutor, _tls, _metricsWireService, @@ -221,13 +221,13 @@ public: DocumentDBFactory::DocumentDBFactory(const vespalib::string &baseDir, int tlsListenPort) : _baseDir(baseDir), _fileHeaderContext(), + _tls("tls", tlsListenPort, baseDir, _fileHeaderContext), _tlsSpec(vespalib::make_string("tcp/localhost:%d", tlsListenPort)), _queryLimiter(), _clock(), _metricsWireService(), _summaryExecutor(8, 128_Ki), _shared_service(_summaryExecutor, _summaryExecutor), - _tls(_shared_service.transport(), "tls", tlsListenPort, baseDir, _fileHeaderContext), _bucketExecutor(2) {} DocumentDBFactory::~DocumentDBFactory() = default; diff --git a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp index 40431b90e27..a7f23c382e9 100644 --- a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp +++ b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp @@ -8,14 +8,12 @@ #include <vespa/vespalib/util/programoptions.h> #include <vespa/vespalib/util/xmlstream.h> #include <vespa/vespalib/util/time.h> -#include <vespa/vespalib/util/size_literals.h> #include <vespa/document/config/documenttypes_config_fwd.h> #include <vespa/document/config/config-documenttypes.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/fieldvalue/document.h> #include <vespa/document/update/documentupdate.h> #include <vespa/config/helper/configgetter.hpp> -#include <vespa/fnet/transport.h> #include <vespa/fastos/app.h> #include <iostream> #include <thread> @@ -326,7 +324,7 @@ public: */ struct Utility { - virtual ~Utility() = default; + virtual ~Utility() {} typedef std::unique_ptr<Utility> UP; virtual int run() = 0; }; @@ -373,8 +371,6 @@ class BaseUtility : public Utility protected: const BaseOptions &_bopts; DummyFileHeaderContext _fileHeader; - FastOS_ThreadPool _threadPool; - FNET_Transport _transport; TransLogServer _server; client::TransLogClient _client; @@ -382,15 +378,9 @@ public: BaseUtility(const BaseOptions &bopts) : _bopts(bopts), _fileHeader(), - _threadPool(64_Ki), - _transport(), - _server(_transport, _bopts.tlsName, _bopts.listenPort, _bopts.tlsDir, _fileHeader), + _server(_bopts.tlsName, _bopts.listenPort, _bopts.tlsDir, _fileHeader), _client(vespalib::make_string("tcp/localhost:%d", _bopts.listenPort)) { - _transport.Start(&_threadPool); - } - ~BaseUtility() override { - _transport.ShutDown(true); } virtual int run() override = 0; }; diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp index faea1cc8b7c..7eda54cdbee 100644 --- a/searchcore/src/tests/proton/docsummary/docsummary.cpp +++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp @@ -176,10 +176,10 @@ class DBContext : public DummyDBOwner { public: DirMaker _dmk; - DummyFileHeaderContext _fileHeaderContext; + DummyFileHeaderContext _fileHeaderContext; + TransLogServer _tls; vespalib::ThreadStackExecutor _summaryExecutor; - MockSharedThreadingService _shared_service; - TransLogServer _tls; + MockSharedThreadingService _shared_service; storage::spi::dummy::DummyBucketExecutor _bucketExecutor; bool _mkdirOk; matching::QueryLimiter _queryLimiter; @@ -198,9 +198,9 @@ public: DBContext(const std::shared_ptr<const DocumentTypeRepo> &repo, const char *docTypeName) : _dmk(docTypeName), _fileHeaderContext(), + _tls("tmp", 9013, ".", _fileHeaderContext), _summaryExecutor(8, 128_Ki), _shared_service(_summaryExecutor, _summaryExecutor), - _tls(_shared_service.transport(), "tmp", 9013, ".", _fileHeaderContext), _bucketExecutor(2), _mkdirOk(FastOS_File::MakeDirectory("tmpdb")), _queryLimiter(), @@ -223,7 +223,7 @@ public: std::make_shared<BucketspacesConfig>(), _tuneFileDocumentDB, _hwInfo); _configMgr.forwardConfig(b); - _configMgr.nextGeneration(_shared_service.transport(), 0ms); + _configMgr.nextGeneration(0ms); if (! FastOS_File::MakeDirectory((std::string("tmpdb/") + docTypeName).c_str())) { LOG_ABORT("should not be reached"); } diff --git a/searchcore/src/tests/proton/documentdb/CMakeLists.txt b/searchcore/src/tests/proton/documentdb/CMakeLists.txt index d36368a8ebd..8ebeaef9a99 100644 --- a/searchcore/src/tests/proton/documentdb/CMakeLists.txt +++ b/searchcore/src/tests/proton/documentdb/CMakeLists.txt @@ -3,7 +3,6 @@ vespa_add_executable(searchcore_documentdb_test_app TEST SOURCES documentdb_test.cpp DEPENDS - searchcore_test searchcore_server searchcore_initializer searchcore_reprocessing diff --git a/searchcore/src/tests/proton/documentdb/document_subdbs/CMakeLists.txt b/searchcore/src/tests/proton/documentdb/document_subdbs/CMakeLists.txt index 2706d183988..0bc1235bb6f 100644 --- a/searchcore/src/tests/proton/documentdb/document_subdbs/CMakeLists.txt +++ b/searchcore/src/tests/proton/documentdb/document_subdbs/CMakeLists.txt @@ -3,7 +3,6 @@ vespa_add_executable(searchcore_document_subdbs_test_app TEST SOURCES document_subdbs_test.cpp DEPENDS - searchcore_test searchcore_server searchcore_initializer searchcore_reprocessing diff --git a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp index 6e20d30fb36..b9e3549053a 100644 --- a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp +++ b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp @@ -25,7 +25,6 @@ #include <vespa/searchcore/proton/server/reconfig_params.h> #include <vespa/searchcore/proton/matching/querylimiter.h> #include <vespa/searchcore/proton/test/test.h> -#include <vespa/searchcore/proton/test/transport_helper.h> #include <vespa/searchcore/proton/test/thread_utils.h> #include <vespa/vespalib/util/idestructorcallback.h> #include <vespa/searchlib/index/docbuilder.h> @@ -262,7 +261,7 @@ struct MyConfigSnapshot DocBuilder _builder; DocumentDBConfig::SP _cfg; BootstrapConfig::SP _bootstrap; - MyConfigSnapshot(FNET_Transport & transport, const Schema &schema, const vespalib::string &cfgDir) + MyConfigSnapshot(const Schema &schema, const vespalib::string &cfgDir) : _schema(schema), _builder(_schema), _cfg(), @@ -280,7 +279,7 @@ struct MyConfigSnapshot ::config::DirSpec spec(cfgDir); DocumentDBConfigHelper mgr(spec, "searchdocument"); mgr.forwardConfig(_bootstrap); - mgr.nextGeneration(transport, 1ms); + mgr.nextGeneration(1ms); _cfg = mgr.getConfig(); } }; @@ -288,28 +287,26 @@ struct MyConfigSnapshot template <typename Traits> struct FixtureBase { - TransportMgr _transport; - ThreadStackExecutor _summaryExecutor; + ThreadStackExecutor _summaryExecutor; ExecutorThreadingService _writeService; - typename Traits::Config _cfg; + typename Traits::Config _cfg; std::shared_ptr<bucketdb::BucketDBOwner> _bucketDB; - BucketDBHandler _bucketDBHandler; + BucketDBHandler _bucketDBHandler; typename Traits::Context _ctx; - typename Traits::Schema _baseSchema; - MyConfigSnapshot::UP _snapshot; - DirectoryHandler _baseDir; - typename Traits::SubDB _subDb; - IFeedView::SP _tmpFeedView; + typename Traits::Schema _baseSchema; + MyConfigSnapshot::UP _snapshot; + DirectoryHandler _baseDir; + typename Traits::SubDB _subDb; + IFeedView::SP _tmpFeedView; FixtureBase() - : _transport(), - _summaryExecutor(1, 64_Ki), + : _summaryExecutor(1, 64_Ki), _writeService(_summaryExecutor), _cfg(), _bucketDB(std::make_shared<bucketdb::BucketDBOwner>()), _bucketDBHandler(*_bucketDB), _ctx(_writeService, _bucketDB, _bucketDBHandler), _baseSchema(), - _snapshot(std::make_unique<MyConfigSnapshot>(_transport.transport(), _baseSchema, Traits::ConfigDir::dir())), + _snapshot(std::make_unique<MyConfigSnapshot>(_baseSchema, Traits::ConfigDir::dir())), _baseDir(BASE_DIR + "/" + SUB_NAME, BASE_DIR), _subDb(_cfg._cfg, _ctx._ctx), _tmpFeedView() @@ -349,7 +346,7 @@ struct FixtureBase runInMasterAndSync([&]() { performReconfig(serialNum, reconfigSchema, reconfigConfigDir); }); } void performReconfig(SerialNum serialNum, const Schema &reconfigSchema, const vespalib::string &reconfigConfigDir) { - auto newCfg = std::make_unique<MyConfigSnapshot>(_transport.transport(), reconfigSchema, reconfigConfigDir); + auto newCfg = std::make_unique<MyConfigSnapshot>(reconfigSchema, reconfigConfigDir); DocumentDBConfig::ComparisonResult cmpResult; cmpResult.attributesChanged = true; cmpResult.documenttypesChanged = true; diff --git a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp index bef8d0c49bb..7ba3e0b8240 100644 --- a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp @@ -149,7 +149,7 @@ Fixture::Fixture(bool file_config) _bucketExecutor(2), _db(), _fileHeaderContext(), - _tls(_shared_service.transport(), "tmp", 9014, ".", _fileHeaderContext), + _tls("tmp", 9014, ".", _fileHeaderContext), _queryLimiter(), _clock() { @@ -165,7 +165,7 @@ Fixture::Fixture(bool file_config) std::make_shared<BucketspacesConfig>(), tuneFileDocumentDB, HwInfo()); mgr.forwardConfig(b); - mgr.nextGeneration(_shared_service.transport(), 0ms); + mgr.nextGeneration(0ms); _db = DocumentDB::create(".", mgr.getConfig(), "tcp/localhost:9014", _queryLimiter, _clock, DocTypeName("typea"), makeBucketSpace(), *b->getProtonConfigSP(), _myDBOwner, _shared_service, _bucketExecutor, _tls, _dummy, @@ -183,7 +183,7 @@ std::unique_ptr<ConfigStore> Fixture::make_config_store() { if (_file_config) { - return std::make_unique<FileConfigManager>(_shared_service.transport(), "config", "", "typea"); + return std::make_unique<FileConfigManager>("config", "", "typea"); } else { return std::make_unique<MemoryConfigStore>(); } diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index 9a8d8bad60e..904937a26da 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -27,14 +27,12 @@ #include <vespa/searchcore/proton/server/i_feed_handler_owner.h> #include <vespa/searchcore/proton/server/ireplayconfig.h> #include <vespa/searchcore/proton/test/dummy_feed_view.h> -#include <vespa/searchcore/proton/test/transport_helper.h> #include <vespa/searchlib/index/docbuilder.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/searchlib/transactionlog/translogserver.h> #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/exceptions.h> -#include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/log/log.h> @@ -409,7 +407,6 @@ struct MyTlsWriter : TlsWriter { struct FeedHandlerFixture { DummyFileHeaderContext _fileHeaderContext; - TransportMgr _transport; TransLogServer tls; vespalib::string tlsSpec; vespalib::ThreadStackExecutor sharedExecutor; @@ -426,8 +423,7 @@ struct FeedHandlerFixture FeedHandler handler; FeedHandlerFixture() : _fileHeaderContext(), - _transport(), - tls(_transport.transport(), "mytls", 9016, "mytlsdir", _fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)), + tls("mytls", 9016, "mytlsdir", _fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)), tlsSpec("tcp/localhost:9016"), sharedExecutor(1, 0x10000), writeService(sharedExecutor), diff --git a/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp b/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp index d51ea25f2f5..68b9d2f8d6e 100644 --- a/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp +++ b/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp @@ -13,13 +13,11 @@ #include <vespa/searchcore/proton/server/bootstrapconfig.h> #include <vespa/searchcore/proton/server/fileconfigmanager.h> #include <vespa/searchcore/proton/test/documentdb_config_builder.h> -#include <vespa/searchcore/proton/test/transport_helper.h> #include <vespa/searchsummary/config/config-juniperrc.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/config-bucketspaces.h> #include <vespa/vespalib/testkit/test_kit.h> - using namespace cloud::config::filedistribution; using namespace config; using namespace document; @@ -41,7 +39,7 @@ using vespalib::nbostream; vespalib::string myId("myconfigid"); DocumentDBConfig::SP -makeBaseConfigSnapshot(FNET_Transport & transport) +makeBaseConfigSnapshot() { ::config::DirSpec spec(TEST_PATH("cfg")); @@ -54,7 +52,7 @@ makeBaseConfigSnapshot(FNET_Transport & transport) std::make_shared<BucketspacesConfig>(), std::make_shared<TuneFileDocumentDB>(), HwInfo()); dbcm.forwardConfig(b); - dbcm.nextGeneration(transport, 0ms); + dbcm.nextGeneration(0ms); DocumentDBConfig::SP snap = dbcm.getConfig(); snap->setConfigId(myId); ASSERT_TRUE(snap); @@ -62,9 +60,9 @@ makeBaseConfigSnapshot(FNET_Transport & transport) } void -saveBaseConfigSnapshot(FNET_Transport & transport, const DocumentDBConfig &snap, SerialNum num) +saveBaseConfigSnapshot(const DocumentDBConfig &snap, SerialNum num) { - FileConfigManager cm(transport, "out", myId, snap.getDocTypeName()); + FileConfigManager cm("out", myId, snap.getDocTypeName()); cm.saveConfig(snap, num); } @@ -127,55 +125,55 @@ addConfigsThatAreNotSavedToDisk(const DocumentDBConfig &cfg) return builder.build(); } -TEST_FF("requireThatConfigCanBeSavedAndLoaded", TransportMgr(), DocumentDBConfig::SP(makeBaseConfigSnapshot(f1.transport()))) +TEST_F("requireThatConfigCanBeSavedAndLoaded", DocumentDBConfig::SP(makeBaseConfigSnapshot())) { - DocumentDBConfig::SP fullCfg = addConfigsThatAreNotSavedToDisk(*f2); - saveBaseConfigSnapshot(f1.transport(), *fullCfg, 20); + DocumentDBConfig::SP fullCfg = addConfigsThatAreNotSavedToDisk(*f); + saveBaseConfigSnapshot(*fullCfg, 20); DocumentDBConfig::SP esnap(makeEmptyConfigSnapshot()); { - FileConfigManager cm(f1.transport(), "out", myId, "dummy"); + FileConfigManager cm("out", myId, "dummy"); cm.loadConfig(*esnap, 20, esnap); } - assertEqualSnapshot(*f2, *esnap); + assertEqualSnapshot(*f, *esnap); } -TEST_FF("requireThatConfigCanBeSerializedAndDeserialized", TransportMgr(), DocumentDBConfig::SP(makeBaseConfigSnapshot(f1.transport()))) +TEST_F("requireThatConfigCanBeSerializedAndDeserialized", DocumentDBConfig::SP(makeBaseConfigSnapshot())) { - saveBaseConfigSnapshot(f1.transport(), *f2, 30); + saveBaseConfigSnapshot(*f, 30); nbostream stream; { - FileConfigManager cm(f1.transport(), "out", myId, "dummy"); + FileConfigManager cm("out", myId, "dummy"); cm.serializeConfig(30, stream); } { - FileConfigManager cm(f1.transport(), "out", myId, "dummy"); + FileConfigManager cm("out", myId, "dummy"); cm.deserializeConfig(40, stream); } DocumentDBConfig::SP fsnap(makeEmptyConfigSnapshot()); { - FileConfigManager cm(f1.transport(), "out", myId, "dummy"); + FileConfigManager cm("out", myId, "dummy"); cm.loadConfig(*fsnap, 40, fsnap); } - assertEqualSnapshot(*f2, *fsnap); + assertEqualSnapshot(*f, *fsnap); EXPECT_EQUAL("dummy", fsnap->getDocTypeName()); } -TEST_FF("requireThatConfigCanBeLoadedWithoutExtraConfigsDataFile", TransportMgr(), DocumentDBConfig::SP(makeBaseConfigSnapshot(f1.transport()))) +TEST_F("requireThatConfigCanBeLoadedWithoutExtraConfigsDataFile", DocumentDBConfig::SP(makeBaseConfigSnapshot())) { - saveBaseConfigSnapshot(f1.transport(), *f2, 70); + saveBaseConfigSnapshot(*f, 70); EXPECT_FALSE(vespalib::unlink("out/config-70/extraconfigs.dat")); DocumentDBConfig::SP esnap(makeEmptyConfigSnapshot()); { - FileConfigManager cm(f1.transport(), "out", myId, "dummy"); + FileConfigManager cm("out", myId, "dummy"); cm.loadConfig(*esnap, 70, esnap); } } -TEST_FF("requireThatVisibilityDelayIsPropagated", TransportMgr(), DocumentDBConfig::SP(makeBaseConfigSnapshot(f1.transport()))) +TEST_F("requireThatVisibilityDelayIsPropagated", DocumentDBConfig::SP(makeBaseConfigSnapshot())) { - saveBaseConfigSnapshot(f1.transport(), *f2, 80); + saveBaseConfigSnapshot(*f, 80); DocumentDBConfig::SP esnap(makeEmptyConfigSnapshot()); { ProtonConfigBuilder protonConfigBuilder; @@ -184,7 +182,7 @@ TEST_FF("requireThatVisibilityDelayIsPropagated", TransportMgr(), DocumentDBConf ddb.visibilitydelay = 61.0; protonConfigBuilder.documentdb.push_back(ddb); protonConfigBuilder.maxvisibilitydelay = 100.0; - FileConfigManager cm(f1.transport(), "out", myId, "dummy"); + FileConfigManager cm("out", myId, "dummy"); cm.setProtonConfig(std::make_shared<ProtonConfig>(protonConfigBuilder)); cm.loadConfig(*esnap, 70, esnap); } diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 39a5c69376c..9657619be40 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -33,7 +33,6 @@ #include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h> #include <vespa/searchcore/proton/test/mock_attribute_manager.h> #include <vespa/searchcore/proton/test/test.h> -#include <vespa/searchcore/proton/test/transport_helper.h> #include <vespa/searchlib/common/idocumentmetastore.h> #include <vespa/searchlib/index/docbuilder.h> #include <vespa/vespalib/data/slime/slime.h> @@ -349,7 +348,6 @@ public: test::DiskMemUsageNotifier _diskMemUsageNotifier; BucketCreateNotifier _bucketCreateNotifier; MonitoredRefCount _refCount; - TransportMgr _transport; MaintenanceController _mc; MaintenanceControllerFixture(); @@ -769,8 +767,7 @@ MaintenanceControllerFixture::MaintenanceControllerFixture() _attributeUsageFilter(), _bucketCreateNotifier(), _refCount(), - _transport(), - _mc(_transport.transport(), _threadService, _genericExecutor, _refCount, _docTypeName) + _mc(_threadService, _genericExecutor, _refCount, _docTypeName) { std::vector<MyDocumentSubDB *> subDBs; subDBs.push_back(&_ready); diff --git a/searchcore/src/tests/proton/proton_config_fetcher/CMakeLists.txt b/searchcore/src/tests/proton/proton_config_fetcher/CMakeLists.txt index 20629733f09..88332f360ce 100644 --- a/searchcore/src/tests/proton/proton_config_fetcher/CMakeLists.txt +++ b/searchcore/src/tests/proton/proton_config_fetcher/CMakeLists.txt @@ -3,7 +3,6 @@ vespa_add_executable(searchcore_proton_config_fetcher_test_app TEST SOURCES proton_config_fetcher_test.cpp DEPENDS - searchcore_test searchcore_server searchcore_fconfig ) diff --git a/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp b/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp index b4193b0e0b2..3c9e9fc3a64 100644 --- a/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp +++ b/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp @@ -9,7 +9,6 @@ #include <vespa/searchcore/proton/common/alloc_config.h> #include <vespa/searchcore/proton/common/hw_info.h> #include <vespa/searchcore/proton/common/subdbtype.h> -#include <vespa/searchcore/proton/test/transport_helper.h> #include <vespa/searchcore/config/config-ranking-constants.h> #include <vespa/searchcore/config/config-ranking-expressions.h> #include <vespa/searchcore/config/config-onnx-models.h> @@ -18,7 +17,6 @@ #include <vespa/document/repo/documenttyperepo.h> #include <vespa/fileacquirer/config-filedistributorrpc.h> #include <vespa/vespalib/util/varholder.h> -#include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/testkit/testapp.h> #include <vespa/config/common/configcontext.h> #include <vespa/config-bucketspaces.h> @@ -65,7 +63,6 @@ struct DoctypeFixture { struct ConfigTestFixture { const std::string configId; - TransportMgr transport; ProtonConfigBuilder protonBuilder; DocumenttypesConfigBuilder documenttypesBuilder; FiledistributorrpcConfigBuilder filedistBuilder; @@ -93,8 +90,6 @@ struct ConfigTestFixture { addDocType("_alwaysthere_"); } - ~ConfigTestFixture() = default; - DoctypeFixture *addDocType(const std::string &name, bool isGlobal = false) { DocumenttypesConfigBuilder::Documenttype dt; dt.bodystruct = -1270491200; @@ -254,7 +249,7 @@ getDocumentDBConfig(ConfigTestFixture &f, DocumentDBConfigManager &mgr, const Hw { ConfigRetriever retriever(mgr.createConfigKeySet(), f.context); mgr.forwardConfig(f.getBootstrapConfig(1, hwInfo)); - mgr.update(f.transport.transport(), retriever.getBootstrapConfigs()); // Cheating, but we only need the configs + mgr.update(retriever.getBootstrapConfigs()); // Cheating, but we only need the configs return mgr.getConfig(); } @@ -305,8 +300,8 @@ TEST_FF("require that documentdb config manager builds schema with imported attr TEST_FFF("require that proton config fetcher follows changes to bootstrap", ConfigTestFixture("search"), ProtonConfigOwner(), - ProtonConfigFetcher(f1.transport.transport(), ConfigUri(f1.configId, f1.context), f2, 60s)) { - f3.start(f1.transport.threadPool()); + ProtonConfigFetcher(ConfigUri(f1.configId, f1.context), f2, 60s)) { + f3.start(); ASSERT_TRUE(f2._configured); ASSERT_TRUE(f1.configEqual(f2.getBootstrapConfig())); f2._configured = false; @@ -320,8 +315,8 @@ TEST_FFF("require that proton config fetcher follows changes to bootstrap", TEST_FFF("require that proton config fetcher follows changes to doctypes", ConfigTestFixture("search"), ProtonConfigOwner(), - ProtonConfigFetcher(f1.transport.transport(), ConfigUri(f1.configId, f1.context), f2, 60s)) { - f3.start(f1.transport.threadPool()); + ProtonConfigFetcher(ConfigUri(f1.configId, f1.context), f2, 60s)) { + f3.start(); f2._configured = false; f1.addDocType("typea"); @@ -340,8 +335,8 @@ TEST_FFF("require that proton config fetcher follows changes to doctypes", TEST_FFF("require that proton config fetcher reconfigures dbowners", ConfigTestFixture("search"), ProtonConfigOwner(), - ProtonConfigFetcher(f1.transport.transport(), ConfigUri(f1.configId, f1.context), f2, 60s)) { - f3.start(f1.transport.threadPool()); + ProtonConfigFetcher(ConfigUri(f1.configId, f1.context), f2, 60s)) { + f3.start(); ASSERT_FALSE(f2.getDocumentDBConfig("typea")); // Add db and verify that config for db is provided diff --git a/searchcore/src/tests/proton/proton_disk_layout/CMakeLists.txt b/searchcore/src/tests/proton/proton_disk_layout/CMakeLists.txt index 5b6c84393f7..1c22818630c 100644 --- a/searchcore/src/tests/proton/proton_disk_layout/CMakeLists.txt +++ b/searchcore/src/tests/proton/proton_disk_layout/CMakeLists.txt @@ -3,7 +3,6 @@ vespa_add_executable(searchcore_proton_disk_layout_test_app TEST SOURCES proton_disk_layout_test.cpp DEPENDS - searchcore_test searchcore_server searchcore_fconfig ) diff --git a/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp b/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp index 08b9a55746b..17bd5991b68 100644 --- a/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp +++ b/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp @@ -2,7 +2,6 @@ #include <vespa/searchcore/proton/server/proton_disk_layout.h> #include <vespa/searchcore/proton/common/doctypename.h> -#include <vespa/searchcore/proton/test/transport_helper.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/searchlib/transactionlog/translogserver.h> #include <vespa/searchlib/transactionlog/translogclient.h> @@ -16,7 +15,6 @@ using search::transactionlog::client::TransLogClient; using search::transactionlog::TransLogServer; using proton::DocTypeName; using proton::ProtonDiskLayout; -using proton::TransportMgr; static constexpr unsigned int tlsPort = 9018; @@ -31,7 +29,6 @@ struct FixtureBase struct DiskLayoutFixture { DummyFileHeaderContext _fileHeaderContext; - TransportMgr _transport; TransLogServer _tls; vespalib::string _tlsSpec; ProtonDiskLayout _diskLayout; @@ -94,8 +91,7 @@ struct DiskLayoutFixture { DiskLayoutFixture::DiskLayoutFixture() : _fileHeaderContext(), - _transport(), - _tls(_transport.transport(), "tls", tlsPort, baseDir, _fileHeaderContext), + _tls("tls", tlsPort, baseDir, _fileHeaderContext), _tlsSpec(vespalib::make_string("tcp/localhost:%u", tlsPort)), _diskLayout(baseDir, _tlsSpec) { diff --git a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/CMakeLists.txt b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/CMakeLists.txt index 0b126ccff27..02b3483153d 100644 --- a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/CMakeLists.txt +++ b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/CMakeLists.txt @@ -3,7 +3,6 @@ vespa_add_executable(searchcore_disk_mem_usage_sampler_test_app TEST SOURCES disk_mem_usage_sampler_test.cpp DEPENDS - searchcore_test searchcore_server GTest::GTest ) diff --git a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp index 9469c8b055f..0c80553e1e7 100644 --- a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp +++ b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp @@ -3,8 +3,8 @@ #include <vespa/searchcore/proton/common/hw_info.h> #include <vespa/searchcore/proton/common/i_transient_resource_usage_provider.h> #include <vespa/searchcore/proton/server/disk_mem_usage_sampler.h> -#include <vespa/searchcore/proton/test/transport_helper.h> #include <vespa/vespalib/gtest/gtest.h> +#include <chrono> #include <thread> #include <vespa/log/log.h> @@ -38,19 +38,16 @@ public: }; struct DiskMemUsageSamplerTest : public ::testing::Test { - TransportMgr transport; - std::unique_ptr<DiskMemUsageSampler> sampler; - DiskMemUsageSamplerTest() - : transport(), - sampler(std::make_unique<DiskMemUsageSampler>(transport.transport(), ".", DiskMemUsageSampler::Config(0.8, 0.8, 50ms, make_hw_info()))) + DiskMemUsageSampler sampler; + DiskMemUsageSamplerTest(): + sampler(".", + DiskMemUsageSampler::Config(0.8, 0.8, + 50ms, make_hw_info())) { - sampler->add_transient_usage_provider(std::make_shared<MyProvider>(50, 200)); - sampler->add_transient_usage_provider(std::make_shared<MyProvider>(100, 150)); + sampler.add_transient_usage_provider(std::make_shared<MyProvider>(50, 200)); + sampler.add_transient_usage_provider(std::make_shared<MyProvider>(100, 150)); } - ~DiskMemUsageSamplerTest() { - sampler.reset(); - } - const DiskMemUsageFilter& filter() const { return sampler->writeFilter(); } + const DiskMemUsageFilter& filter() const { return sampler.writeFilter(); } }; TEST_F(DiskMemUsageSamplerTest, resource_usage_is_sampled) diff --git a/searchcore/src/tests/proton/server/shared_threading_service/CMakeLists.txt b/searchcore/src/tests/proton/server/shared_threading_service/CMakeLists.txt index 61325ccb125..9b40ae19c99 100644 --- a/searchcore/src/tests/proton/server/shared_threading_service/CMakeLists.txt +++ b/searchcore/src/tests/proton/server/shared_threading_service/CMakeLists.txt @@ -3,7 +3,6 @@ vespa_add_executable(searchcore_shared_threading_service_test_app TEST SOURCES shared_threading_service_test.cpp DEPENDS - searchcore_test searchcore_server GTest::GTest ) diff --git a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp index 1b8c8d8491b..bede9f967a4 100644 --- a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp +++ b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp @@ -3,7 +3,6 @@ #include <vespa/searchcore/config/config-proton.h> #include <vespa/searchcore/proton/server/shared_threading_service.h> #include <vespa/searchcore/proton/server/shared_threading_service_config.h> -#include <vespa/searchcore/proton/test/transport_helper.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/gtest/gtest.h> @@ -48,16 +47,14 @@ TEST(SharedThreadingServiceConfigTest, shared_threads_are_derived_from_cpu_cores class SharedThreadingServiceTest : public ::testing::Test { public: - TransportMgr transport; std::unique_ptr<SharedThreadingService> service; SharedThreadingServiceTest() - : transport(), - service() - { } - ~SharedThreadingServiceTest() = default; + : service() + { + } void setup(double concurrency, uint32_t cpu_cores) { service = std::make_unique<SharedThreadingService>( - SharedThreadingServiceConfig::make(make_proton_config(concurrency), HwInfo::Cpu(cpu_cores)), transport.transport()); + SharedThreadingServiceConfig::make(make_proton_config(concurrency), HwInfo::Cpu(cpu_cores))); } SequencedTaskExecutor* field_writer() { return dynamic_cast<SequencedTaskExecutor*>(service->field_writer()); diff --git a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt index 60d3a05502a..ac9ae5519f7 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt @@ -32,7 +32,6 @@ vespa_add_library(searchcore_bmcluster STATIC storage_api_rpc_bm_feed_handler.cpp storage_reply_error_checker.cpp DEPENDS - searchcore_test searchcore_server searchcore_initializer searchcore_reprocessing diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp index 85fd304f395..3587e8008f2 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp @@ -449,6 +449,7 @@ class MyBmNode : public BmNode int _distributor_mbus_port; int _distributor_rpc_port; int _distributor_status_port; + TransLogServer _tls; vespalib::string _tls_spec; proton::matching::QueryLimiter _query_limiter; vespalib::Clock _clock; @@ -456,7 +457,6 @@ class MyBmNode : public BmNode proton::MemoryConfigStores _config_stores; vespalib::ThreadStackExecutor _summary_executor; proton::MockSharedThreadingService _shared_service; - TransLogServer _tls; proton::DummyDBOwner _document_db_owner; BucketSpace _bucket_space; std::shared_ptr<DocumentDB> _document_db; @@ -514,6 +514,7 @@ MyBmNode::MyBmNode(const vespalib::string& base_dir, int base_port, uint32_t nod _distributor_mbus_port(port_number(base_port, PortBias::DISTRIBUTOR_MBUS_PORT)), _distributor_rpc_port(port_number(base_port, PortBias::DISTRIBUTOR_RPC_PORT)), _distributor_status_port(port_number(base_port, PortBias::DISTRIBUTOR_STATUS_PORT)), + _tls("tls", _tls_listen_port, _base_dir, _file_header_context), _tls_spec(vespalib::make_string("tcp/localhost:%d", _tls_listen_port)), _query_limiter(), _clock(), @@ -521,7 +522,6 @@ MyBmNode::MyBmNode(const vespalib::string& base_dir, int base_port, uint32_t nod _config_stores(), _summary_executor(8, 128_Ki), _shared_service(_summary_executor, _summary_executor), - _tls(_shared_service.transport(), "tls", _tls_listen_port, _base_dir, _file_header_context), _document_db_owner(), _bucket_space(document::test::makeBucketSpace(_doc_type_name.getName())), _document_db(), @@ -568,7 +568,7 @@ MyBmNode::create_document_db(const BmClusterParams& params) vespalib::mkdir(_base_dir + "/" + _doc_type_name.getName(), false); vespalib::string input_cfg = _base_dir + "/" + _doc_type_name.getName() + "/baseconfig"; { - proton::FileConfigManager fileCfg(_shared_service.transport(), input_cfg, "", _doc_type_name.getName()); + proton::FileConfigManager fileCfg(input_cfg, "", _doc_type_name.getName()); fileCfg.saveConfig(*_document_db_config, 1); } config::DirSpec spec(input_cfg + "/config-1"); @@ -590,7 +590,7 @@ MyBmNode::create_document_db(const BmClusterParams& params) std::make_shared<BucketspacesConfig>(), tuneFileDocDB, HwInfo()); mgr.forwardConfig(bootstrap_config); - mgr.nextGeneration(_shared_service.transport(), 0ms); + mgr.nextGeneration(0ms); _document_db = DocumentDB::create(_base_dir, mgr.getConfig(), _tls_spec, _query_limiter, _clock, _doc_type_name, _bucket_space, *bootstrap_config->getProtonConfigSP(), _document_db_owner, _shared_service, *_persistence_engine, _tls, diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp index 7054c7077c8..2dc749ce26b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp @@ -10,12 +10,12 @@ using vespalib::makeLambdaTask; namespace proton { -DiskMemUsageSampler::DiskMemUsageSampler(FNET_Transport & transport, const std::string &path_in, const Config &config) +DiskMemUsageSampler::DiskMemUsageSampler(const std::string &path_in, const Config &config) : _filter(config.hwInfo), _path(path_in), _sampleInterval(60s), _lastSampleTime(vespalib::steady_clock::now()), - _periodicTimer(std::make_unique<vespalib::ScheduledExecutor>(transport)), + _periodicTimer(), _lock(), _transient_usage_providers() { @@ -30,11 +30,12 @@ DiskMemUsageSampler::~DiskMemUsageSampler() void DiskMemUsageSampler::setConfig(const Config &config) { - _periodicTimer->reset(); + _periodicTimer.reset(); _filter.setConfig(config.filterConfig); _sampleInterval = config.sampleInterval; sampleUsage(); _lastSampleTime = vespalib::steady_clock::now(); + _periodicTimer = std::make_unique<vespalib::ScheduledExecutor>(); vespalib::duration maxInterval = std::min(vespalib::duration(1s), _sampleInterval); _periodicTimer->scheduleAtFixedRate(makeLambdaTask([this]() { if (_filter.acceptWriteOperation() && (vespalib::steady_clock::now() < (_lastSampleTime + _sampleInterval))) { diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h index 7475282e718..74e14bc65de 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h @@ -5,8 +5,6 @@ #include <vespa/vespalib/util/time.h> #include "disk_mem_usage_filter.h" -class FNET_Transport; - namespace vespalib { class ScheduledExecutor; } namespace proton { @@ -53,8 +51,7 @@ public: } }; - DiskMemUsageSampler(FNET_Transport & transport, - const std::string &path_in, + DiskMemUsageSampler(const std::string &path_in, const Config &config); ~DiskMemUsageSampler(); diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 078e5a69fb3..a30aa916896 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -15,6 +15,7 @@ #include "replay_throttling_policy.h" #include <vespa/document/repo/documenttyperepo.h> #include <vespa/metrics/updatehook.h> +#include <vespa/searchcore/proton/attribute/attribute_config_inspector.h> #include <vespa/searchcore/proton/attribute/attribute_writer.h> #include <vespa/searchcore/proton/attribute/i_attribute_usage_listener.h> #include <vespa/searchcore/proton/attribute/imported_attributes_repo.h> @@ -219,7 +220,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _feedHandler(std::make_unique<FeedHandler>(_writeService, tlsSpec, docTypeName, *this, _writeFilter, *this, tlsWriterFactory)), _subDBs(*this, *this, *_feedHandler, _docTypeName, _writeService, shared_service.warmup(), fileHeaderContext, metricsWireService, getMetrics(), queryLimiter, clock, _configMutex, _baseDir, hwInfo), - _maintenanceController(shared_service.transport(), _writeService.master(), shared_service.shared(), _refCount, _docTypeName), + _maintenanceController(_writeService.master(), shared_service.shared(), _refCount, _docTypeName), _jobTrackers(), _calc(), _metricsUpdater(_subDBs, _writeService, _jobTrackers, *_sessionManager, _writeFilter, *_feedHandler) diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp index 54561ec6304..b7c319d46f5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp @@ -272,19 +272,18 @@ build_alloc_config(const ProtonConfig& proton_config, const vespalib::string& do distribution_config.redundancy, distribution_config.searchablecopies); } -vespalib::string -resolve_file(config::RpcFileAcquirer &fileAcquirer, vespalib::TimeBox &timeBox, - const vespalib::string &desc, const vespalib::string &fileref) +vespalib::string resolve_file(config::RpcFileAcquirer &fileAcquirer, vespalib::TimeBox &timeBox, + const vespalib::string &desc, const vespalib::string &fileref) { vespalib::string filePath; - LOG(debug, "Waiting for file acquirer (%s, ref='%s')", desc.c_str(), fileref.c_str()); + LOG(info, "Waiting for file acquirer (%s, ref='%s')", desc.c_str(), fileref.c_str()); while (timeBox.hasTimeLeft() && (filePath == "")) { filePath = fileAcquirer.wait_for(fileref, timeBox.timeLeft()); if (filePath == "") { - std::this_thread::sleep_for(100ms); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } - LOG(debug, "Got file path from file acquirer: '%s' (%s, ref='%s')", filePath.c_str(), desc.c_str(), fileref.c_str()); + LOG(info, "Got file path from file acquirer: '%s' (%s, ref='%s')", filePath.c_str(), desc.c_str(), fileref.c_str()); if (filePath == "") { throw config::ConfigTimeoutException(fmt("could not get file path from file acquirer for %s (ref=%s)", desc.c_str(), fileref.c_str())); @@ -295,7 +294,7 @@ resolve_file(config::RpcFileAcquirer &fileAcquirer, vespalib::TimeBox &timeBox, } void -DocumentDBConfigManager::update(FNET_Transport & transport, const ConfigSnapshot &snapshot) +DocumentDBConfigManager::update(const ConfigSnapshot &snapshot) { using RankProfilesConfigSP = DocumentDBConfig::RankProfilesConfigSP; using RankingConstantsConfigSP = std::shared_ptr<vespa::config::search::core::RankingConstantsConfig>; @@ -356,7 +355,7 @@ DocumentDBConfigManager::update(FNET_Transport & transport, const ConfigSnapshot const vespalib::string &spec = _bootstrapConfig->getFiledistributorrpcConfig().connectionspec; RankingConstants::Vector constants; if (spec != "") { - config::RpcFileAcquirer fileAcquirer(transport, spec); + config::RpcFileAcquirer fileAcquirer(spec); vespalib::TimeBox timeBox(5*60, 5); for (const RankingConstantsConfig::Constant &rc : newRankingConstantsConfig->constant) { auto desc = fmt("name='%s', type='%s'", rc.name.c_str(), rc.type.c_str()); @@ -372,7 +371,7 @@ DocumentDBConfigManager::update(FNET_Transport & transport, const ConfigSnapshot const vespalib::string &spec = _bootstrapConfig->getFiledistributorrpcConfig().connectionspec; RankingExpressions expressions; if (spec != "") { - config::RpcFileAcquirer fileAcquirer(transport, spec); + config::RpcFileAcquirer fileAcquirer(spec); vespalib::TimeBox timeBox(5*60, 5); for (const RankingExpressionsConfig::Expression &rc : newRankingExpressionsConfig->expression) { auto desc = fmt("name='%s'", rc.name.c_str()); @@ -388,7 +387,7 @@ DocumentDBConfigManager::update(FNET_Transport & transport, const ConfigSnapshot const vespalib::string &spec = _bootstrapConfig->getFiledistributorrpcConfig().connectionspec; OnnxModels::Vector models; if (spec != "") { - config::RpcFileAcquirer fileAcquirer(transport, spec); + config::RpcFileAcquirer fileAcquirer(spec); vespalib::TimeBox timeBox(5*60, 5); for (const OnnxModelsConfig::Model &rc : newOnnxModelsConfig->model) { auto desc = fmt("name='%s'", rc.name.c_str()); @@ -497,12 +496,12 @@ DocumentDBConfigHelper::DocumentDBConfigHelper(const DirSpec &spec, const vespal DocumentDBConfigHelper::~DocumentDBConfigHelper() = default; bool -DocumentDBConfigHelper::nextGeneration(FNET_Transport & transport, vespalib::duration timeout) +DocumentDBConfigHelper::nextGeneration(std::chrono::milliseconds timeoutInMillis) { - ConfigSnapshot snapshot(_retriever->getBootstrapConfigs(timeout)); + ConfigSnapshot snapshot(_retriever->getBootstrapConfigs(timeoutInMillis)); if (snapshot.empty()) return false; - _mgr.update(transport, snapshot); + _mgr.update(snapshot); return true; } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.h b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.h index 74f58bca9a9..ad5959d551b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.h @@ -5,8 +5,6 @@ #include "documentdbconfig.h" #include <mutex> -class FNET_Transport; - namespace config { class ConfigRetriever; class DirSpec; @@ -40,7 +38,7 @@ private: public: DocumentDBConfigManager(const vespalib::string &configId, const vespalib::string &docTypeName); ~DocumentDBConfigManager(); - void update(FNET_Transport & transport, const config::ConfigSnapshot & snapshot); + void update(const config::ConfigSnapshot & snapshot); DocumentDBConfig::SP getConfig() const; @@ -58,7 +56,7 @@ public: DocumentDBConfigHelper(const config::DirSpec &spec, const vespalib::string &docTypeName); ~DocumentDBConfigHelper(); - bool nextGeneration(FNET_Transport & transport, vespalib::duration timeout); + bool nextGeneration(std::chrono::milliseconds timeoutInMillis); DocumentDBConfig::SP getConfig() const; void forwardConfig(const std::shared_ptr<BootstrapConfig> & config); private: diff --git a/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.cpp index 8ffa6c0a84d..34369a0803e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.cpp @@ -207,12 +207,10 @@ getFileList(const vespalib::string &snapDir) } -FileConfigManager::FileConfigManager(FNET_Transport & transport, - const vespalib::string &baseDir, +FileConfigManager::FileConfigManager(const vespalib::string &baseDir, const vespalib::string &configId, const vespalib::string &docTypeName) - : _transport(transport), - _baseDir(baseDir), + : _baseDir(baseDir), _configId(configId), _docTypeName(docTypeName), _info(baseDir), @@ -360,7 +358,7 @@ FileConfigManager::loadConfig(const DocumentDBConfig ¤tSnapshot, search::S bucketspaces,currentSnapshot.getTuneFileDocumentDBSP(), sampler.hwInfo()); dbc.forwardConfig(bootstrap); - dbc.nextGeneration(_transport, 0ms); + dbc.nextGeneration(0ms); loadedSnapshot = dbc.getConfig(); loadedSnapshot->setConfigId(_configId); diff --git a/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.h b/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.h index cca68d7d84c..417ab6cf394 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.h +++ b/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.h @@ -11,12 +11,11 @@ namespace proton { class FileConfigManager : public ConfigStore { private: - FNET_Transport & _transport; - vespalib::string _baseDir; - vespalib::string _configId; - vespalib::string _docTypeName; - search::IndexMetaInfo _info; - ProtonConfigSP _protonConfig; + vespalib::string _baseDir; + vespalib::string _configId; + vespalib::string _docTypeName; + search::IndexMetaInfo _info; + ProtonConfigSP _protonConfig; public: /** @@ -25,8 +24,9 @@ public: * @param baseDir the directory in which config snapshots are saved and loaded. * @param configId the configId that was used to subscribe to config that is later handled by this manager. */ - FileConfigManager(FNET_Transport & transport, const vespalib::string &baseDir, - const vespalib::string &configId, const vespalib::string &docTypeName); + FileConfigManager(const vespalib::string &baseDir, + const vespalib::string &configId, + const vespalib::string &docTypeName); ~FileConfigManager() override; diff --git a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h index a467f9bc320..dfa48cb8d1a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h @@ -1,8 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -class FNET_Transport; - namespace vespalib { class ISequencedTaskExecutor; class ThreadExecutor; @@ -16,7 +14,7 @@ namespace proton { */ class ISharedThreadingService { public: - virtual ~ISharedThreadingService() = default; + virtual ~ISharedThreadingService() {} /** * Returns the executor used for warmup (e.g. index warmup). @@ -46,11 +44,6 @@ public: * Returns an InvokeService intended for regular wakeup calls. */ virtual vespalib::InvokeService & invokeService() = 0; - - /** - * Returns a shared transport object that can be utilized by multiple services. - */ - virtual FNET_Transport & transport() = 0; }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp index 0df211b5a0b..fa4bae8f01b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp @@ -7,7 +7,6 @@ #include <vespa/searchcorespi/index/i_thread_service.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/scheduledexecutor.h> -#include <vespa/fastos/thread.h> #include <thread> #include <vespa/log/log.h> @@ -40,8 +39,7 @@ isRunnable(const MaintenanceJobRunner & job, const Executor * master) { } -MaintenanceController::MaintenanceController(FNET_Transport & transport, - ISyncableThreadService& masterThread, +MaintenanceController::MaintenanceController(ISyncableThreadService& masterThread, vespalib::Executor& shared_executor, MonitoredRefCount& refCount, const DocTypeName& docTypeName) @@ -51,7 +49,7 @@ MaintenanceController::MaintenanceController(FNET_Transport & transport, _readySubDB(), _remSubDB(), _notReadySubDB(), - _periodicTimer(std::make_unique<vespalib::ScheduledExecutor>(transport)), + _periodicTimer(), _config(), _state(State::INITIALIZING), _docTypeName(docTypeName), @@ -95,7 +93,7 @@ MaintenanceController::killJobs() // Called by master write thread assert(_masterThread.isCurrentThread()); LOG(debug, "killJobs(): threadId=%zu", (size_t)FastOS_Thread::GetCurrentThreadId()); - _periodicTimer->reset(); + _periodicTimer.reset(); // No need to take _jobsLock as modification of _jobs also happens in master write thread. for (auto &job : _jobs) { job->stop(); // Make sure no more tasks are added to the executor @@ -174,7 +172,7 @@ MaintenanceController::restart() if (!getStarted() || getStopping() || !_readySubDB.valid()) { return; } - _periodicTimer->reset(); + _periodicTimer = std::make_unique<vespalib::ScheduledExecutor>(); addJobsToPeriodicTimer(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h index 8e5bb8d860c..086f5a36404 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h @@ -42,7 +42,7 @@ public: using UP = std::unique_ptr<MaintenanceController>; enum class State {INITIALIZING, STARTED, PAUSED, STOPPING}; - MaintenanceController(FNET_Transport & transport, ISyncableThreadService& masterThread, vespalib::Executor& shared_executor, + MaintenanceController(ISyncableThreadService& masterThread, vespalib::Executor& shared_executor, vespalib::MonitoredRefCount& refCount, const DocTypeName& docTypeName); ~MaintenanceController(); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 3aaf00a6541..642f8746587 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -43,7 +43,6 @@ #include <vespa/vespalib/util/random.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/size_literals.h> -#include <vespa/fnet/transport.h> #ifdef __linux__ #include <malloc.h> #endif @@ -203,7 +202,7 @@ Proton::ProtonFileHeaderContext::setClusterName(const vespalib::string & cluster Proton::Proton(const config::ConfigUri & configUri, const vespalib::string &progName, - vespalib::duration subscribeTimeout) + std::chrono::milliseconds subscribeTimeout) : IProtonConfigurerOwner(), search::engine::MonitorServer(), IDocumentDBOwner(), @@ -211,8 +210,6 @@ Proton::Proton(const config::ConfigUri & configUri, IPersistenceEngineOwner(), ComponentConfigProducer(), _cpu_util(), - _threadPool(std::make_unique<FastOS_ThreadPool>(128_Ki)), - _transport(std::make_unique<FNET_Transport>(TransportConfig(1))), _configUri(configUri), _mutex(), _metricsHook(std::make_unique<MetricsUpdateHook>(*this)), @@ -238,11 +235,12 @@ Proton::Proton(const config::ConfigUri & configUri, _executor(1, 128_Ki), _protonDiskLayout(), _protonConfigurer(_executor, *this, _protonDiskLayout), - _protonConfigFetcher(*_transport, configUri, _protonConfigurer, subscribeTimeout), + _protonConfigFetcher(configUri, _protonConfigurer, subscribeTimeout), _shared_service(), _compile_cache_executor_binding(), _queryLimiter(), _clock(1.0/vespalib::getVespaTimerHz()), + _threadPool(128_Ki), _distributionKey(-1), _isInitializing(true), _abortInit(false), @@ -260,11 +258,10 @@ Proton::init() { assert( ! _initStarted && ! _initComplete ); _initStarted = true; - _transport->Start(_threadPool.get()); - if (_threadPool->NewThread(_clock.getRunnable(), nullptr) == nullptr) { + if (_threadPool.NewThread(_clock.getRunnable(), nullptr) == nullptr) { throw IllegalStateException("Failed starting thread for the cheap clock"); } - _protonConfigFetcher.start(*_threadPool); + _protonConfigFetcher.start(); auto configSnapshot = _protonConfigurer.getPendingConfigSnapshot(); assert(configSnapshot); auto bootstrapConfig = configSnapshot->getBootstrapConfig(); @@ -282,8 +279,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) setBucketCheckSumType(protonConfig); setFS4Compression(protonConfig); - _shared_service = std::make_unique<SharedThreadingService>(SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu()), *_transport); - _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>(_shared_service->transport(), protonConfig.basedir, + _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>(protonConfig.basedir, diskMemUsageSamplerConfig(protonConfig, hwInfo)); _tls = std::make_unique<TLS>(_configUri.createWithNewId(protonConfig.tlsconfigid), _fileHeaderContext); @@ -317,7 +313,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _protonDiskLayout = std::make_unique<ProtonDiskLayout>(protonConfig.basedir, protonConfig.tlsspec); vespalib::chdir(protonConfig.basedir); vespalib::alloc::MmapFileAllocatorFactory::instance().setup(protonConfig.basedir + "/swapdirs"); - _tls->start(*_transport, hwInfo.cpu().cores()); + _tls->start(hwInfo.cpu().cores()); _flushEngine = std::make_unique<FlushEngine>(std::make_shared<flushengine::TlsStatsFactory>(_tls->getTransLogServer()), strategy, flush.maxconcurrent, vespalib::from_s(flush.idleinterval)); _metricsEngine->addExternalMetrics(_summaryEngine->getMetrics()); @@ -332,6 +328,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) protonConfig.visit.ignoremaxbytes); vespalib::string fileConfigId; + _shared_service = std::make_unique<SharedThreadingService>(SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu())); _compile_cache_executor_binding = vespalib::eval::CompileCache::bind(_shared_service->shared_raw()); InitializeThreads initializeThreads; if (protonConfig.initialize.threads > 0) { @@ -479,7 +476,6 @@ Proton::~Proton() _compile_cache_executor_binding.reset(); _shared_service.reset(); _clock.stop(); - _transport->ShutDown(true); LOG(debug, "Explicit destructor done"); } @@ -602,8 +598,9 @@ Proton::addDocumentDB(const document::DocumentType &docType, vespalib::string db_dir = config.basedir + "/documents/" + docTypeName.toString(); vespalib::mkdir(db_dir, false); // Assume parent is created. - auto config_store = std::make_unique<FileConfigManager>(*_transport, db_dir + "/config", - documentDBConfig->getConfigId(), docTypeName.getName()); + auto config_store = std::make_unique<FileConfigManager>(db_dir + "/config", + documentDBConfig->getConfigId(), + docTypeName.getName()); config_store->setProtonConfig(bootstrapConfig->getProtonConfigSP()); if (!initializeThreads) { // If configured value for initialize threads was 0, or we diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h index 0490b1e00b7..61e381cfebb 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton.h @@ -82,8 +82,6 @@ private: }; vespalib::CpuUtil _cpu_util; - std::unique_ptr<FastOS_ThreadPool> _threadPool; - std::unique_ptr<FNET_Transport> _transport; const config::ConfigUri _configUri; mutable std::shared_mutex _mutex; std::unique_ptr<metrics::UpdateHook> _metricsHook; @@ -112,6 +110,7 @@ private: vespalib::eval::CompileCache::ExecutorBinding::UP _compile_cache_executor_binding; matching::QueryLimiter _queryLimiter; vespalib::Clock _clock; + FastOS_ThreadPool _threadPool; uint32_t _distributionKey; bool _isInitializing; bool _abortInit; @@ -147,7 +146,7 @@ public: Proton(const config::ConfigUri & configUri, const vespalib::string &progName, - vespalib::duration subscribeTimeout); + std::chrono::milliseconds subscribeTimeout); ~Proton() override; /** @@ -183,7 +182,7 @@ public: const std::shared_ptr<DocumentDBConfig> &documentDBConfig, InitializeThreads initializeThreads); metrics::MetricManager & getMetricManager(); - FastOS_ThreadPool & getThreadPool() { return *_threadPool; } + FastOS_ThreadPool & getThreadPool() { return _threadPool; } bool triggerFlush(); bool prepareRestart(); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp index f956a370ffa..485f14b8fb9 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp @@ -20,13 +20,13 @@ using namespace std::chrono_literals; namespace proton { -ProtonConfigFetcher::ProtonConfigFetcher(FNET_Transport & transport, const config::ConfigUri & configUri, IProtonConfigurer &owner, vespalib::duration subscribeTimeout) - : _transport(transport), - _bootstrapConfigManager(configUri.getConfigId()), +ProtonConfigFetcher::ProtonConfigFetcher(const config::ConfigUri & configUri, IProtonConfigurer &owner, std::chrono::milliseconds subscribeTimeout) + : _bootstrapConfigManager(configUri.getConfigId()), _retriever(_bootstrapConfigManager.createConfigKeySet(), configUri.getContext(), subscribeTimeout), _owner(owner), _mutex(), _dbManagerMap(), + _threadPool(128_Ki, 1), _oldDocumentTypeRepos(), _currentDocumentTypeRepo() { @@ -60,7 +60,8 @@ ProtonConfigFetcher::pruneManagerMap(const BootstrapConfig::SP & config) ConfigKeySet set; lock_guard guard(_mutex); - for (const ProtonConfig::Documentdb & ddb : protonConfig.documentdb) { + for (size_t i = 0; i < protonConfig.documentdb.size(); i++) { + const ProtonConfig::Documentdb & ddb(protonConfig.documentdb[i]); DocTypeName docTypeName(ddb.inputdoctypename); LOG(debug, "Document type(%s), configid(%s)", ddb.inputdoctypename.c_str(), ddb.configid.c_str()); DocumentDBConfigManager::SP mgr; @@ -82,7 +83,7 @@ ProtonConfigFetcher::updateDocumentDBConfigs(const BootstrapConfig::SP & bootstr lock_guard guard(_mutex); for (auto & entry : _dbManagerMap) { entry.second->forwardConfig(bootstrapConfig); - entry.second->update(_transport, snapshot); + entry.second->update(snapshot); } } @@ -161,10 +162,10 @@ ProtonConfigFetcher::getGeneration() const } void -ProtonConfigFetcher::start(FastOS_ThreadPool & threadPool) +ProtonConfigFetcher::start() { fetchConfigs(); - if (threadPool.NewThread(this, nullptr) == nullptr) { + if (_threadPool.NewThread(this, nullptr) == nullptr) { throw vespalib::IllegalStateException( "Failed starting thread for proton config fetcher"); } @@ -175,6 +176,7 @@ ProtonConfigFetcher::close() { if (!_retriever.isClosed()) { _retriever.close(); + _threadPool.Close(); } } @@ -187,7 +189,7 @@ ProtonConfigFetcher::rememberDocumentTypeRepo(std::shared_ptr<const document::Do return; // no change } auto &repos = _oldDocumentTypeRepos; - vespalib::steady_time now = vespalib::steady_clock::now(); + TimePoint now = Clock::now(); while (!repos.empty() && repos.front().first < now) { repos.pop_front(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.h b/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.h index 11123c49fcb..e6d036df7d9 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.h @@ -26,7 +26,7 @@ class ProtonConfigFetcher : public FastOS_Runnable public: using BootstrapConfigSP = std::shared_ptr<BootstrapConfig>; - ProtonConfigFetcher(FNET_Transport & transport, const config::ConfigUri & configUri, IProtonConfigurer &owner, vespalib::duration subscribeTimeout); + ProtonConfigFetcher(const config::ConfigUri & configUri, IProtonConfigurer &owner, std::chrono::milliseconds subscribeTimeout); ~ProtonConfigFetcher() override; /** * Get the current config generation. @@ -36,7 +36,7 @@ public: /** * Start config fetcher, callbacks may come from now on. */ - void start(FastOS_ThreadPool & threadPool); + void start(); /** * Shutdown config fetcher, ensuring that no more callbacks arrive @@ -47,19 +47,20 @@ public: private: typedef std::map<DocTypeName, DocumentDBConfigManager::SP> DBManagerMap; - using OldDocumentTypeRepo = std::pair<vespalib::steady_time, std::shared_ptr<const document::DocumentTypeRepo>>; - using lock_guard = std::lock_guard<std::mutex>; - + using Clock = std::chrono::steady_clock; + using TimePoint = std::chrono::time_point<Clock>; + using OldDocumentTypeRepo = std::pair<TimePoint, std::shared_ptr<const document::DocumentTypeRepo>>; - FNET_Transport & _transport; - BootstrapConfigManager _bootstrapConfigManager; - config::ConfigRetriever _retriever; - IProtonConfigurer & _owner; + BootstrapConfigManager _bootstrapConfigManager; + config::ConfigRetriever _retriever; + IProtonConfigurer & _owner; - mutable std::mutex _mutex; // Protects maps - DBManagerMap _dbManagerMap; + mutable std::mutex _mutex; // Protects maps + using lock_guard = std::lock_guard<std::mutex>; + DBManagerMap _dbManagerMap; - std::deque<OldDocumentTypeRepo> _oldDocumentTypeRepos; + FastOS_ThreadPool _threadPool; + std::deque<OldDocumentTypeRepo> _oldDocumentTypeRepos; std::shared_ptr<const document::DocumentTypeRepo> _currentDocumentTypeRepo; void fetchConfigs(); diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp index 1f06b29518b..80199c8c50c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp @@ -6,8 +6,6 @@ #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/size_literals.h> -#include <vespa/fnet/transport.h> -#include <vespa/fastos/thread.h> using vespalib::CpuUsage; @@ -19,10 +17,8 @@ namespace proton { using SharedFieldWriterExecutor = ThreadingServiceConfig::ProtonConfig::Feeding::SharedFieldWriterExecutor; -SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfig& cfg, FNET_Transport & transport) - : - _transport(transport), - _warmup(cfg.warmup_threads(), 128_Ki, CpuUsage::wrap(proton_warmup_executor, CpuUsage::Category::COMPACT)), +SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfig& cfg) + : _warmup(cfg.warmup_threads(), 128_Ki, CpuUsage::wrap(proton_warmup_executor, CpuUsage::Category::COMPACT)), _shared(std::make_shared<vespalib::BlockingThreadStackExecutor>(cfg.shared_threads(), 128_Ki, cfg.shared_task_limit(), proton_shared_executor)), _field_writer(), diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h index 349dcc2d0ce..cd0e6d71402 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h @@ -16,16 +16,15 @@ namespace proton { class SharedThreadingService : public ISharedThreadingService { private: using Registration = std::unique_ptr<vespalib::IDestructorCallback>; - FNET_Transport & _transport; - vespalib::ThreadStackExecutor _warmup; + vespalib::ThreadStackExecutor _warmup; std::shared_ptr<vespalib::SyncableThreadExecutor> _shared; std::unique_ptr<vespalib::ISequencedTaskExecutor> _field_writer; - vespalib::InvokeServiceImpl _invokeService; - std::vector<Registration> _invokeRegistrations; + vespalib::InvokeServiceImpl _invokeService; + std::vector<Registration> _invokeRegistrations; public: - SharedThreadingService(const SharedThreadingServiceConfig& cfg, FNET_Transport & transport); - ~SharedThreadingService() override; + SharedThreadingService(const SharedThreadingServiceConfig& cfg); + ~SharedThreadingService(); std::shared_ptr<vespalib::Executor> shared_raw() { return _shared; } void sync_all_executors(); @@ -34,7 +33,6 @@ public: vespalib::ThreadExecutor& shared() override { return *_shared; } vespalib::ISequencedTaskExecutor* field_writer() override { return _field_writer.get(); } vespalib::InvokeService & invokeService() override { return _invokeService; } - FNET_Transport & transport() override { return _transport; } }; } diff --git a/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt index e6eda87f66e..2c5dbe0317b 100644 --- a/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt @@ -7,10 +7,8 @@ vespa_add_library(searchcore_test STATIC clusterstatehandler.cpp documentdb_config_builder.cpp dummy_feed_view.cpp - mock_shared_threading_service.cpp userdocumentsbuilder.cpp threading_service_observer.cpp - transport_helper.cpp DEPENDS searchcore_fconfig ) diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp deleted file mode 100644 index e976bf1d726..00000000000 --- a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "mock_shared_threading_service.h" - -namespace proton { - -MockSharedThreadingService::MockSharedThreadingService(ThreadExecutor& warmup_in, ThreadExecutor& shared_in) - : _warmup(warmup_in), - _shared(shared_in), - _invokeService(10ms), - _transportMgr() -{ -} - -MockSharedThreadingService::~MockSharedThreadingService() = default; - -} diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h index d33afcbec3d..74965c15cd4 100644 --- a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h +++ b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h @@ -1,7 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "transport_helper.h" #include <vespa/searchcore/proton/server/i_shared_threading_service.h> #include <vespa/vespalib/util/invokeserviceimpl.h> @@ -9,21 +8,21 @@ namespace proton { class MockSharedThreadingService : public ISharedThreadingService { private: - using ThreadExecutor = vespalib::ThreadExecutor; - ThreadExecutor & _warmup; - ThreadExecutor & _shared; - vespalib::InvokeServiceImpl _invokeService; - TransportMgr _transportMgr; + vespalib::ThreadExecutor& _warmup; + vespalib::ThreadExecutor& _shared; + vespalib::InvokeServiceImpl _invokeService; public: - MockSharedThreadingService(ThreadExecutor& warmup_in, - ThreadExecutor& shared_in); - ~MockSharedThreadingService() override; - ThreadExecutor& warmup() override { return _warmup; } - ThreadExecutor& shared() override { return _shared; } + MockSharedThreadingService(vespalib::ThreadExecutor& warmup_in, + vespalib::ThreadExecutor& shared_in) + : _warmup(warmup_in), + _shared(shared_in), + _invokeService(10ms) + {} + vespalib::ThreadExecutor& warmup() override { return _warmup; } + vespalib::ThreadExecutor& shared() override { return _shared; } vespalib::ISequencedTaskExecutor* field_writer() override { return nullptr; } vespalib::InvokeService & invokeService() override { return _invokeService; } - FNET_Transport & transport() override { return _transportMgr.transport(); } }; } diff --git a/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp b/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp deleted file mode 100644 index 10623fb0726..00000000000 --- a/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "transport_helper.h" -#include <vespa/fnet/transport.h> -#include <vespa/fastos/thread.h> -#include <vespa/vespalib/util/size_literals.h> - -namespace proton { - -TransportMgr::TransportMgr() - : _threadPool(std::make_unique<FastOS_ThreadPool>(64_Ki)), - _transport(std::make_unique<FNET_Transport>()) -{ - _transport->Start(_threadPool.get()); -} - -TransportMgr::~TransportMgr() { - _transport->ShutDown(true); -} - -} diff --git a/searchcore/src/vespa/searchcore/proton/test/transport_helper.h b/searchcore/src/vespa/searchcore/proton/test/transport_helper.h deleted file mode 100644 index 09cca2dd007..00000000000 --- a/searchcore/src/vespa/searchcore/proton/test/transport_helper.h +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <memory> - -class FastOS_ThreadPool; -class FNET_Transport; - -namespace proton { - -/** - * Helper class contain a FNET_Transport object for use in tests. - **/ -class TransportMgr { -private: - std::unique_ptr<FastOS_ThreadPool> _threadPool; - std::unique_ptr<FNET_Transport> _transport; - -public: - TransportMgr(); - ~TransportMgr(); - FNET_Transport & transport() { return *_transport; } - FastOS_ThreadPool & threadPool() { return *_threadPool; } -}; - -} diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index afb1eb53417..fa0753373bf 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -7,9 +7,7 @@ #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/document/util/bytebuffer.h> #include <vespa/vespalib/util/exceptions.h> -#include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/destructor_callbacks.h> -#include <vespa/fnet/transport.h> #include <vespa/fastos/file.h> #include <thread> @@ -458,30 +456,12 @@ getMaxSessionRunTime(TransLogServer &tls, const vespalib::string &domain) return tls.getDomainStats()[domain].maxSessionRunTime.count(); } -struct TLS { - FastOS_ThreadPool threadPool; - FNET_Transport transport; - TransLogServer tls; - TLS(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads = 4) - : threadPool(64_Ki), - transport(), - tls(transport, name, listenPort, baseDir, fileHeaderContext, cfg, maxThreads) - { - transport.Start(&threadPool); - } - ~TLS() { - transport.ShutDown(true); - } - -}; - void createAndFillDomain(const vespalib::string & dir, const vespalib::string & name, Encoding encoding, size_t preExistingDomains) { DummyFileHeaderContext fileHeaderContext; - TLS tlss(dir, 18377, ".", fileHeaderContext, - createDomainConfig(0x1000000).setEncoding(encoding), 4); + TransLogServer tlss(dir, 18377, ".", fileHeaderContext, + createDomainConfig(0x1000000).setEncoding(encoding), 4); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, name, preExistingDomains); @@ -492,7 +472,7 @@ createAndFillDomain(const vespalib::string & dir, const vespalib::string & name, void verifyDomain(const vespalib::string & dir, const vespalib::string & name) { DummyFileHeaderContext fileHeaderContext; - TLS tlss(dir, 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); + TransLogServer tlss(dir, 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); TransLogClient tls("tcp/localhost:18377"); auto s1 = openDomainTest(tls, name); visitDomainTest(tls, s1.get(), name); @@ -503,16 +483,16 @@ verifyDomain(const vespalib::string & dir, const vespalib::string & name) { void testVisitOverGeneratedDomain(const vespalib::string & testDir) { DummyFileHeaderContext fileHeaderContext; - TLS tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x10000)); + TransLogServer tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x10000)); TransLogClient tls("tcp/localhost:18377"); vespalib::string name("test1"); createDomainTest(tls, name); auto s1 = openDomainTest(tls, name); fillDomainTest(s1.get(), name); - EXPECT_EQUAL(0, getMaxSessionRunTime(tlss.tls, "test1")); + EXPECT_EQUAL(0, getMaxSessionRunTime(tlss, "test1")); visitDomainTest(tls, s1.get(), name); - double maxSessionRunTime = getMaxSessionRunTime(tlss.tls, "test1"); + double maxSessionRunTime = getMaxSessionRunTime(tlss, "test1"); LOG(info, "testVisitOverGeneratedDomain(): maxSessionRunTime=%f", maxSessionRunTime); EXPECT_GREATER(maxSessionRunTime, 0); } @@ -521,7 +501,7 @@ void testVisitOverPreExistingDomain(const vespalib::string & testDir) { // Depends on Test::testVisitOverGeneratedDomain() DummyFileHeaderContext fileHeaderContext; - TLS tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x10000)); + TransLogServer tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x10000)); TransLogClient tls("tcp/localhost:18377"); vespalib::string name("test1"); @@ -532,7 +512,7 @@ testVisitOverPreExistingDomain(const vespalib::string & testDir) { void partialUpdateTest(const vespalib::string & testDir) { DummyFileHeaderContext fileHeaderContext; - TLS tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x10000)); + TransLogServer tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x10000)); TransLogClient tls("tcp/localhost:18377"); auto s1 = openDomainTest(tls, "test1"); @@ -611,7 +591,7 @@ TEST("testCrcVersions") { TEST("testRemove") { test::DirectoryHandler testDir("testremove"); DummyFileHeaderContext fileHeaderContext; - TLS tlss(testDir.getDir(), 18377, ".", fileHeaderContext, createDomainConfig(0x10000)); + TransLogServer tlss(testDir.getDir(), 18377, ".", fileHeaderContext, createDomainConfig(0x10000)); TransLogClient tls("tcp/localhost:18377"); vespalib::string name("test-delete"); @@ -659,14 +639,14 @@ assertStatus(Session &s, SerialNum expFirstSerial, SerialNum expLastSerial, uint void -testSendingAlotOfDataSync(const vespalib::string & testDir) { + testSendingAlotOfDataSync(const vespalib::string & testDir) { const unsigned int NUM_PACKETS = 1000; const unsigned int NUM_ENTRIES = 100; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; const vespalib::string MANY("many"); { DummyFileHeaderContext fileHeaderContext; - TLS tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x80000)); + TransLogServer tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x80000)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, MANY, 0); @@ -689,7 +669,7 @@ testSendingAlotOfDataSync(const vespalib::string & testDir) { } { DummyFileHeaderContext fileHeaderContext; - TLS tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); + TransLogServer tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); TransLogClient tls("tcp/localhost:18377"); auto s1 = openDomainTest(tls, "many"); @@ -710,7 +690,7 @@ testSendingAlotOfDataSync(const vespalib::string & testDir) { } { DummyFileHeaderContext fileHeaderContext; - TLS tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); + TransLogServer tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); TransLogClient tls("tcp/localhost:18377"); auto s1 = openDomainTest(tls, MANY); @@ -738,11 +718,11 @@ void testSendingAlotOfDataAsync(const vespalib::string & testDir) { const vespalib::string MANY("many-async"); { DummyFileHeaderContext fileHeaderContext; - TLS tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x80000)); + TransLogServer tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x80000)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, MANY, 1); auto s1 = openDomainTest(tls, MANY); - fillDomainTest(tlss.tls, MANY, NUM_PACKETS, NUM_ENTRIES); + fillDomainTest(tlss, MANY, NUM_PACKETS, NUM_ENTRIES); SerialNum b(0), e(0); size_t c(0); EXPECT_TRUE(s1->status(b, e, c)); @@ -760,7 +740,7 @@ void testSendingAlotOfDataAsync(const vespalib::string & testDir) { } { DummyFileHeaderContext fileHeaderContext; - TLS tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); + TransLogServer tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); TransLogClient tls("tcp/localhost:18377"); auto s1 = openDomainTest(tls, MANY); @@ -795,7 +775,7 @@ TEST("testErase") { test::DirectoryHandler testDir("test12"); { DummyFileHeaderContext fileHeaderContext; - TLS tlss(testDir.getDir(), 18377, ".", fileHeaderContext, createDomainConfig(0x80000)); + TransLogServer tlss(testDir.getDir(), 18377, ".", fileHeaderContext, createDomainConfig(0x80000)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "erase", 0); @@ -804,7 +784,7 @@ TEST("testErase") { } { DummyFileHeaderContext fileHeaderContext; - TLS tlss(testDir.getDir(), 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); + TransLogServer tlss(testDir.getDir(), 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); TransLogClient tls("tcp/localhost:18377"); auto s1 = openDomainTest(tls, "erase"); @@ -813,7 +793,7 @@ TEST("testErase") { TEST_DO(assertVisitStats(tls, "erase", 2, TOTAL_NUM_ENTRIES, 3, TOTAL_NUM_ENTRIES, TOTAL_NUM_ENTRIES -2, TOTAL_NUM_ENTRIES - 3)); - DomainStats domainStats = tlss.tls.getDomainStats(); + DomainStats domainStats = tlss.getDomainStats(); DomainInfo domainInfo = domainStats["erase"]; size_t numParts = domainInfo.parts.size(); LOG(info, "%zu parts", numParts); @@ -892,7 +872,7 @@ TEST("testSync") { DummyFileHeaderContext fileHeaderContext; test::DirectoryHandler testDir("test9"); - TLS tlss(testDir.getDir(), 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); + TransLogServer tlss(testDir.getDir(), 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "sync", 0); @@ -915,7 +895,7 @@ TEST("test truncate on version mismatch") { DummyFileHeaderContext fileHeaderContext; test::DirectoryHandler testDir("test11"); { - TLS tlss(testDir.getDir(), 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); + TransLogServer tlss(testDir.getDir(), 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "sync", 0); @@ -936,7 +916,7 @@ TEST("test truncate on version mismatch") { EXPECT_EQUAL(static_cast<ssize_t>(sizeof(tmp)), f.Write2(tmp, sizeof(tmp))); EXPECT_TRUE(f.Close()); { - TLS tlss(testDir.getDir(), 18377, ".", fileHeaderContext, createDomainConfig(0x10000)); + TransLogServer tlss(testDir.getDir(), 18377, ".", fileHeaderContext, createDomainConfig(0x10000)); TransLogClient tls("tcp/localhost:18377"); auto s1 = openDomainTest(tls, "sync"); uint64_t from(0), to(0); @@ -962,7 +942,7 @@ TEST("test truncation after short read") { DomainConfig domainConfig = createDomainConfig(0x10000); DummyFileHeaderContext fileHeaderContext; { - TLS tlss(topdir.getDir(), 18377, ".", fileHeaderContext, domainConfig); + TransLogServer tlss(topdir.getDir(), 18377, ".", fileHeaderContext, domainConfig); TransLogClient tls(tlsspec); createDomainTest(tls, domain, 0); @@ -976,7 +956,7 @@ TEST("test truncation after short read") { } EXPECT_EQUAL(2u, countFiles(dir)); { - TLS tlss(topdir.getDir(), 18377, ".", fileHeaderContext, domainConfig); + TransLogServer tlss(topdir.getDir(), 18377, ".", fileHeaderContext, domainConfig); TransLogClient tls(tlsspec); auto s1 = openDomainTest(tls, domain); checkFilledDomainTest(*s1, TOTAL_NUM_ENTRIES); @@ -989,7 +969,7 @@ TEST("test truncation after short read") { trfile.SetSize(trfile.getSize() - 1); } { - TLS tlss(topdir.getDir(), 18377, ".", fileHeaderContext, domainConfig); + TransLogServer tlss(topdir.getDir(), 18377, ".", fileHeaderContext, domainConfig); TransLogClient tls(tlsspec); auto s1 = openDomainTest(tls, domain); checkFilledDomainTest(*s1, TOTAL_NUM_ENTRIES - 1); diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp index 3d8379adba2..6c0f312f0fb 100644 --- a/searchlib/src/tests/transactionlogstress/translogstress.cpp +++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp @@ -4,10 +4,8 @@ #include <vespa/searchlib/transactionlog/translogserver.h> #include <vespa/searchlib/transactionlog/translogclient.h> #include <vespa/vespalib/util/rand48.h> -#include <vespa/vespalib/util/size_literals.h> #include <vespa/searchlib/util/runnable.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> -#include <vespa/fnet/transport.h> #include <vespa/fastos/app.h> #include <iostream> #include <sstream> @@ -700,13 +698,13 @@ TransLogStress::Main() } // start transaction log server - FastOS_ThreadPool threadPool(256_Ki); - FNET_Transport transport; DummyFileHeaderContext fileHeaderContext; - TransLogServer tls(transport, "server", 17897, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(_cfg.domainPartSize)); + TransLogServer tls("server", 17897, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(_cfg.domainPartSize)); TransLogClient client(tlsSpec); client.create(domain); + FastOS_ThreadPool threadPool(256000); + BufferGenerator bufferGenerator(_cfg.minStrLen, _cfg.maxStrLen); bufferGenerator.setSeed(_cfg.baseSeed); std::vector<nbostream> buffers; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index ee9b0ce81e8..ccef92c0802 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -81,19 +81,19 @@ VESPA_THREAD_STACK_TAG(tls_executor); } -TransLogServer::TransLogServer(FNET_Transport & transport, const vespalib::string &name, int listenPort, const vespalib::string &baseDir, +TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const FileHeaderContext &fileHeaderContext) - : TransLogServer(transport, name, listenPort, baseDir, fileHeaderContext, + : TransLogServer(name, listenPort, baseDir, fileHeaderContext, DomainConfig().setEncoding(Encoding(Encoding::xxh64, Encoding::Compression::zstd)) .setPartSizeLimit(0x10000000).setChunkSizeLimit(0x40000)) {} -TransLogServer::TransLogServer(FNET_Transport & transport, const vespalib::string &name, int listenPort, const vespalib::string &baseDir, +TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg) - : TransLogServer(transport, name, listenPort, baseDir, fileHeaderContext, cfg, 4) + : TransLogServer(name, listenPort, baseDir, fileHeaderContext, cfg, 4) {} -TransLogServer::TransLogServer(FNET_Transport & transport, const vespalib::string &name, int listenPort, const vespalib::string &baseDir, +TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads) : FRT_Invokable(), _name(name), @@ -101,7 +101,8 @@ TransLogServer::TransLogServer(FNET_Transport & transport, const vespalib::strin _domainConfig(cfg), _executor(maxThreads, 128_Ki, CpuUsage::wrap(tls_executor, CpuUsage::Category::WRITE)), _threadPool(std::make_unique<FastOS_ThreadPool>(120_Ki)), - _supervisor(std::make_unique<FRT_Supervisor>(&transport)), + _transport(std::make_unique<FNET_Transport>()), + _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())), _domains(), _reqQ(), _fileHeaderContext(fileHeaderContext), @@ -129,6 +130,7 @@ TransLogServer::TransLogServer(FNET_Transport & transport, const vespalib::strin bool listenOk(false); for (int i(600); !listenOk && i; i--) { if (_supervisor->Listen(listenSpec)) { + _transport->Start(_threadPool.get()); listenOk = true; } else { LOG(warning, "Failed listening at port %s trying for %d seconds more.", listenSpec, i); @@ -155,6 +157,7 @@ TransLogServer::~TransLogServer() _executor.sync(); _executor.shutdown(); _executor.sync(); + _transport->ShutDown(true); } bool diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index 9dcd8cdb6e5..48f6f53dd2a 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -24,11 +24,11 @@ public: friend class TransLogServerExplorer; using SP = std::shared_ptr<TransLogServer>; using DomainSP = std::shared_ptr<Domain>; - TransLogServer(FNET_Transport & transport, const vespalib::string &name, int listenPort, const vespalib::string &baseDir, + TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads); - TransLogServer(FNET_Transport & transport, const vespalib::string &name, int listenPort, const vespalib::string &baseDir, + TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg); - TransLogServer(FNET_Transport & transport, const vespalib::string &name, int listenPort, const vespalib::string &baseDir, + TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const common::FileHeaderContext &fileHeaderContext); ~TransLogServer() override; DomainStats getDomainStats() const; @@ -85,6 +85,7 @@ private: DomainConfig _domainConfig; vespalib::ThreadStackExecutor _executor; std::unique_ptr<FastOS_ThreadPool> _threadPool; + std::unique_ptr<FNET_Transport> _transport; std::unique_ptr<FRT_Supervisor> _supervisor; DomainList _domains; mutable std::shared_mutex _domainMutex;; // Protects _domains diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp index 5c960625a79..927047cecaa 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp @@ -89,13 +89,13 @@ derive_num_threads(uint32_t configured_cores, uint32_t actual_cores) { } void -TransLogServerApp::start(FNET_Transport & transport, uint32_t num_cores) +TransLogServerApp::start(uint32_t num_cores) { std::lock_guard<std::mutex> guard(_lock); auto c = _tlsConfig.get(); DomainConfig domainConfig = getDomainConfig(*c); logReconfig(*c, domainConfig); - _tls = std::make_shared<TransLogServer>(transport, c->servername, c->listenport, c->basedir, _fileHeaderContext, + _tls = std::make_shared<TransLogServer>(c->servername, c->listenport, c->basedir, _fileHeaderContext, domainConfig, derive_num_threads(c->maxthreads, num_cores)); } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h index f5979750b5e..ccd207ed6d1 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h @@ -32,7 +32,7 @@ public: TransLogServer::SP getTransLogServer() const; - void start(FNET_Transport & transport, uint32_t num_cores); + void start(uint32_t num_cores); }; } diff --git a/staging_vespalib/src/tests/timer/timer_test.cpp b/staging_vespalib/src/tests/timer/timer_test.cpp index 1f0ee81e4e6..9d04500b8cd 100644 --- a/staging_vespalib/src/tests/timer/timer_test.cpp +++ b/staging_vespalib/src/tests/timer/timer_test.cpp @@ -2,15 +2,18 @@ #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/util/scheduledexecutor.h> -#include <vespa/vespalib/util/size_literals.h> -#include <vespa/fnet/transport.h> -#include <vespa/fastos/thread.h> using namespace vespalib; using vespalib::Executor; typedef Executor::Task Task; -namespace { +class Test : public TestApp +{ +public: + int Main() override; + void testScheduling(); + void testReset(); +}; class TestTask : public Task { private: @@ -20,36 +23,35 @@ public: void run() override { _latch.countDown(); } }; +int +Test::Main() +{ + TEST_INIT("timer_test"); + testScheduling(); + testReset(); + TEST_DONE(); } -TEST("testScheduling") { +void Test::testScheduling() +{ vespalib::CountDownLatch latch1(3); vespalib::CountDownLatch latch2(2); - FastOS_ThreadPool threadPool(64_Ki); - FNET_Transport transport; - transport.Start(&threadPool); - ScheduledExecutor timer(transport); + ScheduledExecutor timer; timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 100ms, 200ms); timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch2), 500ms, 500ms); EXPECT_TRUE(latch1.await(60s)); EXPECT_TRUE(latch2.await(60s)); - timer.reset(); - transport.ShutDown(true); } -TEST("testReset") { +void Test::testReset() +{ vespalib::CountDownLatch latch1(2); - FastOS_ThreadPool threadPool(64_Ki); - FNET_Transport transport; - transport.Start(&threadPool); - ScheduledExecutor timer(transport); + ScheduledExecutor timer; timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 2s, 3s); timer.reset(); EXPECT_TRUE(!latch1.await(3s)); timer.scheduleAtFixedRate(std::make_unique<TestTask>(latch1), 200ms, 300ms); EXPECT_TRUE(latch1.await(60s)); - timer.reset(); - transport.ShutDown(true); } -TEST_MAIN() { TEST_RUN_ALL(); } +TEST_APPHOOK(Test) diff --git a/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.cpp index 99254240f3c..f5384999cb8 100644 --- a/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.cpp @@ -3,10 +3,11 @@ #include <vespa/fnet/scheduler.h> #include <vespa/fnet/task.h> #include <vespa/fnet/transport.h> +#include <vespa/vespalib/util/size_literals.h> namespace vespalib { -using Task = vespalib::Executor::Task; +typedef vespalib::Executor::Task Task; class TimerTask : public FNET_Task { @@ -15,8 +16,8 @@ private: TimerTask&operator=(const TimerTask &); FNET_Scheduler *_scheduler; - Task::UP _task; - duration _interval; + Task::UP _task; + duration _interval; public: TimerTask(FNET_Scheduler *scheduler, Task::UP task, duration interval) : FNET_Task(scheduler), @@ -34,15 +35,21 @@ public: } }; -ScheduledExecutor::ScheduledExecutor(FNET_Transport & transport) - : _transport(transport), +ScheduledExecutor::ScheduledExecutor() + : _threadPool(128_Ki), + _transport(new FNET_Transport()), _lock(), _taskList() -{ } +{ + _transport->Start(&_threadPool); +} ScheduledExecutor::~ScheduledExecutor() { - reset(); + std::lock_guard guard(_lock); + _transport->ShutDown(true); + _threadPool.Close(); + _taskList.clear(); } @@ -50,7 +57,7 @@ void ScheduledExecutor::scheduleAtFixedRate(vespalib::Executor::Task::UP task, duration delay, duration interval) { std::lock_guard guard(_lock); - auto tTask = std::make_unique<TimerTask>(_transport.GetScheduler(), std::move(task), interval); + TimerTaskPtr tTask(new TimerTask(_transport->GetScheduler(), std::move(task), interval)); _taskList.push_back(std::move(tTask)); _taskList.back()->Schedule(to_s(delay)); } @@ -59,10 +66,10 @@ void ScheduledExecutor::reset() { std::lock_guard guard(_lock); - for (auto & task : _taskList) { - task->Unschedule(); - } + _transport->ShutDown(true); _taskList.clear(); + _transport = std::make_unique<FNET_Transport>(); + _transport->Start(&_threadPool); } } diff --git a/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.h b/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.h index 1be49f6ff45..dcc16a8c379 100644 --- a/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/scheduledexecutor.h @@ -3,6 +3,7 @@ #include <vespa/vespalib/util/executor.h> #include <vespa/vespalib/util/time.h> +#include <vespa/fastos/thread.h> #include <vector> class FNET_Transport; @@ -19,8 +20,10 @@ class TimerTask; class ScheduledExecutor { private: - using TaskList = std::vector<std::unique_ptr<TimerTask>>; - FNET_Transport & _transport; + typedef std::unique_ptr<TimerTask> TimerTaskPtr; + typedef std::vector<TimerTaskPtr> TaskList; + FastOS_ThreadPool _threadPool; + std::unique_ptr<FNET_Transport> _transport; std::mutex _lock; TaskList _taskList; @@ -28,7 +31,7 @@ public: /** * Create a new timer, capable of scheduling tasks at fixed intervals. */ - ScheduledExecutor(FNET_Transport & transport); + ScheduledExecutor(); /** * Destroys this timer, finishing the current task executing and then @@ -43,7 +46,7 @@ public: * @param delay The delay to wait before first execution. * @param interval The interval in seconds. */ - void scheduleAtFixedRate(std::unique_ptr<Executor::Task> task, duration delay, duration interval); + void scheduleAtFixedRate(vespalib::Executor::Task::UP task, duration delay, duration interval); /** * Reset timer, clearing the list of task to execute. |