From 8a382bd5819f2e2f7653f604cc018b394770fe44 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Mon, 21 Feb 2022 19:01:06 +0100 Subject: Revert "Revert "Use a common FNET_Transport owned by Proton in both SceduledExecutor …"" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- searchcore/src/apps/tests/CMakeLists.txt | 1 + .../src/apps/tests/persistenceconformance_test.cpp | 14 +++---- .../vespa-transactionlog-inspect.cpp | 14 ++++++- .../src/tests/proton/docsummary/docsummary.cpp | 10 ++--- .../src/tests/proton/documentdb/CMakeLists.txt | 1 + .../documentdb/document_subdbs/CMakeLists.txt | 1 + .../document_subdbs/document_subdbs_test.cpp | 29 +++++++------- .../tests/proton/documentdb/documentdb_test.cpp | 6 +-- .../documentdb/feedhandler/feedhandler_test.cpp | 6 ++- .../fileconfigmanager/fileconfigmanager_test.cpp | 44 +++++++++++----------- .../maintenancecontroller_test.cpp | 5 ++- .../proton/proton_config_fetcher/CMakeLists.txt | 1 + .../proton_config_fetcher_test.cpp | 19 ++++++---- .../tests/proton/proton_disk_layout/CMakeLists.txt | 1 + .../proton_disk_layout/proton_disk_layout_test.cpp | 6 ++- .../server/disk_mem_usage_sampler/CMakeLists.txt | 1 + .../disk_mem_usage_sampler_test.cpp | 21 ++++++----- .../server/shared_threading_service/CMakeLists.txt | 1 + .../shared_threading_service_test.cpp | 11 ++++-- .../src/vespa/searchcore/bmcluster/CMakeLists.txt | 1 + .../src/vespa/searchcore/bmcluster/bm_node.cpp | 8 ++-- .../proton/server/disk_mem_usage_sampler.cpp | 7 ++-- .../proton/server/disk_mem_usage_sampler.h | 5 ++- .../vespa/searchcore/proton/server/documentdb.cpp | 3 +- .../proton/server/documentdbconfigmanager.cpp | 25 ++++++------ .../proton/server/documentdbconfigmanager.h | 6 ++- .../searchcore/proton/server/fileconfigmanager.cpp | 8 ++-- .../searchcore/proton/server/fileconfigmanager.h | 16 ++++---- .../proton/server/i_shared_threading_service.h | 9 ++++- .../proton/server/maintenancecontroller.cpp | 10 +++-- .../proton/server/maintenancecontroller.h | 2 +- .../src/vespa/searchcore/proton/server/proton.cpp | 25 ++++++------ .../src/vespa/searchcore/proton/server/proton.h | 7 ++-- .../proton/server/proton_config_fetcher.cpp | 18 ++++----- .../proton/server/proton_config_fetcher.h | 25 ++++++------ .../proton/server/shared_threading_service.cpp | 8 +++- .../proton/server/shared_threading_service.h | 12 +++--- .../vespa/searchcore/proton/test/CMakeLists.txt | 2 + .../proton/test/mock_shared_threading_service.cpp | 17 +++++++++ .../proton/test/mock_shared_threading_service.h | 23 +++++------ .../searchcore/proton/test/transport_helper.cpp | 21 +++++++++++ .../searchcore/proton/test/transport_helper.h | 26 +++++++++++++ 42 files changed, 305 insertions(+), 171 deletions(-) create mode 100644 searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp create mode 100644 searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp create mode 100644 searchcore/src/vespa/searchcore/proton/test/transport_helper.h (limited to 'searchcore') diff --git a/searchcore/src/apps/tests/CMakeLists.txt b/searchcore/src/apps/tests/CMakeLists.txt index 12e19d40aca..532c89d6ab7 100644 --- a/searchcore/src/apps/tests/CMakeLists.txt +++ b/searchcore/src/apps/tests/CMakeLists.txt @@ -3,6 +3,7 @@ vespa_add_executable(searchcore_persistenceconformance_test_app TEST SOURCES persistenceconformance_test.cpp DEPENDS + searchcore_test searchcore_server searchcore_initializer searchcore_reprocessing diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp index 483cc3f2792..fd0ace21955 100644 --- a/searchcore/src/apps/tests/persistenceconformance_test.cpp +++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp @@ -169,14 +169,14 @@ class DocumentDBFactory : public DummyDBOwner { private: vespalib::string _baseDir; DummyFileHeaderContext _fileHeaderContext; - TransLogServer _tls; vespalib::string _tlsSpec; matching::QueryLimiter _queryLimiter; vespalib::Clock _clock; - mutable DummyWireService _metricsWireService; - mutable MemoryConfigStores _config_stores; + mutable DummyWireService _metricsWireService; + mutable MemoryConfigStores _config_stores; vespalib::ThreadStackExecutor _summaryExecutor; - MockSharedThreadingService _shared_service; + MockSharedThreadingService _shared_service; + TransLogServer _tls; storage::spi::dummy::DummyBucketExecutor _bucketExecutor; static std::shared_ptr make_proton_config() { @@ -196,7 +196,7 @@ public: vespalib::mkdir(_baseDir + "/" + docType.toString(), false); vespalib::string inputCfg = _baseDir + "/" + docType.toString() + "/baseconfig"; { - FileConfigManager fileCfg(inputCfg, "", docType.getName()); + FileConfigManager fileCfg(_shared_service.transport(), inputCfg, "", docType.getName()); fileCfg.saveConfig(*snapshot, 1); } config::DirSpec spec(inputCfg + "/config-1"); @@ -208,7 +208,7 @@ public: std::make_shared(), tuneFileDocDB, HwInfo()); mgr.forwardConfig(b); - mgr.nextGeneration(0ms); + mgr.nextGeneration(_shared_service.transport(), 0ms); return DocumentDB::create(_baseDir, mgr.getConfig(), _tlsSpec, _queryLimiter, _clock, docType, bucketSpace, *b->getProtonConfigSP(), const_cast(*this), _shared_service, _bucketExecutor, _tls, _metricsWireService, @@ -221,13 +221,13 @@ public: DocumentDBFactory::DocumentDBFactory(const vespalib::string &baseDir, int tlsListenPort) : _baseDir(baseDir), _fileHeaderContext(), - _tls("tls", tlsListenPort, baseDir, _fileHeaderContext), _tlsSpec(vespalib::make_string("tcp/localhost:%d", tlsListenPort)), _queryLimiter(), _clock(), _metricsWireService(), _summaryExecutor(8, 128_Ki), _shared_service(_summaryExecutor, _summaryExecutor), + _tls(_shared_service.transport(), "tls", tlsListenPort, baseDir, _fileHeaderContext), _bucketExecutor(2) {} DocumentDBFactory::~DocumentDBFactory() = default; diff --git a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp index a7f23c382e9..40431b90e27 100644 --- a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp +++ b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp @@ -8,12 +8,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include #include #include @@ -324,7 +326,7 @@ public: */ struct Utility { - virtual ~Utility() {} + virtual ~Utility() = default; typedef std::unique_ptr UP; virtual int run() = 0; }; @@ -371,6 +373,8 @@ class BaseUtility : public Utility protected: const BaseOptions &_bopts; DummyFileHeaderContext _fileHeader; + FastOS_ThreadPool _threadPool; + FNET_Transport _transport; TransLogServer _server; client::TransLogClient _client; @@ -378,9 +382,15 @@ public: BaseUtility(const BaseOptions &bopts) : _bopts(bopts), _fileHeader(), - _server(_bopts.tlsName, _bopts.listenPort, _bopts.tlsDir, _fileHeader), + _threadPool(64_Ki), + _transport(), + _server(_transport, _bopts.tlsName, _bopts.listenPort, _bopts.tlsDir, _fileHeader), _client(vespalib::make_string("tcp/localhost:%d", _bopts.listenPort)) { + _transport.Start(&_threadPool); + } + ~BaseUtility() override { + _transport.ShutDown(true); } virtual int run() override = 0; }; diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp index 7eda54cdbee..faea1cc8b7c 100644 --- a/searchcore/src/tests/proton/docsummary/docsummary.cpp +++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp @@ -176,10 +176,10 @@ class DBContext : public DummyDBOwner { public: DirMaker _dmk; - DummyFileHeaderContext _fileHeaderContext; - TransLogServer _tls; + DummyFileHeaderContext _fileHeaderContext; vespalib::ThreadStackExecutor _summaryExecutor; - MockSharedThreadingService _shared_service; + MockSharedThreadingService _shared_service; + TransLogServer _tls; storage::spi::dummy::DummyBucketExecutor _bucketExecutor; bool _mkdirOk; matching::QueryLimiter _queryLimiter; @@ -198,9 +198,9 @@ public: DBContext(const std::shared_ptr &repo, const char *docTypeName) : _dmk(docTypeName), _fileHeaderContext(), - _tls("tmp", 9013, ".", _fileHeaderContext), _summaryExecutor(8, 128_Ki), _shared_service(_summaryExecutor, _summaryExecutor), + _tls(_shared_service.transport(), "tmp", 9013, ".", _fileHeaderContext), _bucketExecutor(2), _mkdirOk(FastOS_File::MakeDirectory("tmpdb")), _queryLimiter(), @@ -223,7 +223,7 @@ public: std::make_shared(), _tuneFileDocumentDB, _hwInfo); _configMgr.forwardConfig(b); - _configMgr.nextGeneration(0ms); + _configMgr.nextGeneration(_shared_service.transport(), 0ms); if (! FastOS_File::MakeDirectory((std::string("tmpdb/") + docTypeName).c_str())) { LOG_ABORT("should not be reached"); } diff --git a/searchcore/src/tests/proton/documentdb/CMakeLists.txt b/searchcore/src/tests/proton/documentdb/CMakeLists.txt index 8ebeaef9a99..d36368a8ebd 100644 --- a/searchcore/src/tests/proton/documentdb/CMakeLists.txt +++ b/searchcore/src/tests/proton/documentdb/CMakeLists.txt @@ -3,6 +3,7 @@ vespa_add_executable(searchcore_documentdb_test_app TEST SOURCES documentdb_test.cpp DEPENDS + searchcore_test searchcore_server searchcore_initializer searchcore_reprocessing diff --git a/searchcore/src/tests/proton/documentdb/document_subdbs/CMakeLists.txt b/searchcore/src/tests/proton/documentdb/document_subdbs/CMakeLists.txt index 0bc1235bb6f..2706d183988 100644 --- a/searchcore/src/tests/proton/documentdb/document_subdbs/CMakeLists.txt +++ b/searchcore/src/tests/proton/documentdb/document_subdbs/CMakeLists.txt @@ -3,6 +3,7 @@ vespa_add_executable(searchcore_document_subdbs_test_app TEST SOURCES document_subdbs_test.cpp DEPENDS + searchcore_test searchcore_server searchcore_initializer searchcore_reprocessing diff --git a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp index b9e3549053a..6e20d30fb36 100644 --- a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp +++ b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -261,7 +262,7 @@ struct MyConfigSnapshot DocBuilder _builder; DocumentDBConfig::SP _cfg; BootstrapConfig::SP _bootstrap; - MyConfigSnapshot(const Schema &schema, const vespalib::string &cfgDir) + MyConfigSnapshot(FNET_Transport & transport, const Schema &schema, const vespalib::string &cfgDir) : _schema(schema), _builder(_schema), _cfg(), @@ -279,7 +280,7 @@ struct MyConfigSnapshot ::config::DirSpec spec(cfgDir); DocumentDBConfigHelper mgr(spec, "searchdocument"); mgr.forwardConfig(_bootstrap); - mgr.nextGeneration(1ms); + mgr.nextGeneration(transport, 1ms); _cfg = mgr.getConfig(); } }; @@ -287,26 +288,28 @@ struct MyConfigSnapshot template struct FixtureBase { - ThreadStackExecutor _summaryExecutor; + TransportMgr _transport; + ThreadStackExecutor _summaryExecutor; ExecutorThreadingService _writeService; - typename Traits::Config _cfg; + typename Traits::Config _cfg; std::shared_ptr _bucketDB; - BucketDBHandler _bucketDBHandler; + BucketDBHandler _bucketDBHandler; typename Traits::Context _ctx; - typename Traits::Schema _baseSchema; - MyConfigSnapshot::UP _snapshot; - DirectoryHandler _baseDir; - typename Traits::SubDB _subDb; - IFeedView::SP _tmpFeedView; + typename Traits::Schema _baseSchema; + MyConfigSnapshot::UP _snapshot; + DirectoryHandler _baseDir; + typename Traits::SubDB _subDb; + IFeedView::SP _tmpFeedView; FixtureBase() - : _summaryExecutor(1, 64_Ki), + : _transport(), + _summaryExecutor(1, 64_Ki), _writeService(_summaryExecutor), _cfg(), _bucketDB(std::make_shared()), _bucketDBHandler(*_bucketDB), _ctx(_writeService, _bucketDB, _bucketDBHandler), _baseSchema(), - _snapshot(std::make_unique(_baseSchema, Traits::ConfigDir::dir())), + _snapshot(std::make_unique(_transport.transport(), _baseSchema, Traits::ConfigDir::dir())), _baseDir(BASE_DIR + "/" + SUB_NAME, BASE_DIR), _subDb(_cfg._cfg, _ctx._ctx), _tmpFeedView() @@ -346,7 +349,7 @@ struct FixtureBase runInMasterAndSync([&]() { performReconfig(serialNum, reconfigSchema, reconfigConfigDir); }); } void performReconfig(SerialNum serialNum, const Schema &reconfigSchema, const vespalib::string &reconfigConfigDir) { - auto newCfg = std::make_unique(reconfigSchema, reconfigConfigDir); + auto newCfg = std::make_unique(_transport.transport(), reconfigSchema, reconfigConfigDir); DocumentDBConfig::ComparisonResult cmpResult; cmpResult.attributesChanged = true; cmpResult.documenttypesChanged = true; diff --git a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp index 7ba3e0b8240..bef8d0c49bb 100644 --- a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp @@ -149,7 +149,7 @@ Fixture::Fixture(bool file_config) _bucketExecutor(2), _db(), _fileHeaderContext(), - _tls("tmp", 9014, ".", _fileHeaderContext), + _tls(_shared_service.transport(), "tmp", 9014, ".", _fileHeaderContext), _queryLimiter(), _clock() { @@ -165,7 +165,7 @@ Fixture::Fixture(bool file_config) std::make_shared(), tuneFileDocumentDB, HwInfo()); mgr.forwardConfig(b); - mgr.nextGeneration(0ms); + mgr.nextGeneration(_shared_service.transport(), 0ms); _db = DocumentDB::create(".", mgr.getConfig(), "tcp/localhost:9014", _queryLimiter, _clock, DocTypeName("typea"), makeBucketSpace(), *b->getProtonConfigSP(), _myDBOwner, _shared_service, _bucketExecutor, _tls, _dummy, @@ -183,7 +183,7 @@ std::unique_ptr Fixture::make_config_store() { if (_file_config) { - return std::make_unique("config", "", "typea"); + return std::make_unique(_shared_service.transport(), "config", "", "typea"); } else { return std::make_unique(); } diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index 904937a26da..9a8d8bad60e 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -27,12 +27,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include #include @@ -407,6 +409,7 @@ struct MyTlsWriter : TlsWriter { struct FeedHandlerFixture { DummyFileHeaderContext _fileHeaderContext; + TransportMgr _transport; TransLogServer tls; vespalib::string tlsSpec; vespalib::ThreadStackExecutor sharedExecutor; @@ -423,7 +426,8 @@ struct FeedHandlerFixture FeedHandler handler; FeedHandlerFixture() : _fileHeaderContext(), - tls("mytls", 9016, "mytlsdir", _fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)), + _transport(), + tls(_transport.transport(), "mytls", 9016, "mytlsdir", _fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)), tlsSpec("tcp/localhost:9016"), sharedExecutor(1, 0x10000), writeService(sharedExecutor), diff --git a/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp b/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp index 68b9d2f8d6e..d51ea25f2f5 100644 --- a/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp +++ b/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp @@ -13,11 +13,13 @@ #include #include #include +#include #include #include #include #include + using namespace cloud::config::filedistribution; using namespace config; using namespace document; @@ -39,7 +41,7 @@ using vespalib::nbostream; vespalib::string myId("myconfigid"); DocumentDBConfig::SP -makeBaseConfigSnapshot() +makeBaseConfigSnapshot(FNET_Transport & transport) { ::config::DirSpec spec(TEST_PATH("cfg")); @@ -52,7 +54,7 @@ makeBaseConfigSnapshot() std::make_shared(), std::make_shared(), HwInfo()); dbcm.forwardConfig(b); - dbcm.nextGeneration(0ms); + dbcm.nextGeneration(transport, 0ms); DocumentDBConfig::SP snap = dbcm.getConfig(); snap->setConfigId(myId); ASSERT_TRUE(snap); @@ -60,9 +62,9 @@ makeBaseConfigSnapshot() } void -saveBaseConfigSnapshot(const DocumentDBConfig &snap, SerialNum num) +saveBaseConfigSnapshot(FNET_Transport & transport, const DocumentDBConfig &snap, SerialNum num) { - FileConfigManager cm("out", myId, snap.getDocTypeName()); + FileConfigManager cm(transport, "out", myId, snap.getDocTypeName()); cm.saveConfig(snap, num); } @@ -125,55 +127,55 @@ addConfigsThatAreNotSavedToDisk(const DocumentDBConfig &cfg) return builder.build(); } -TEST_F("requireThatConfigCanBeSavedAndLoaded", DocumentDBConfig::SP(makeBaseConfigSnapshot())) +TEST_FF("requireThatConfigCanBeSavedAndLoaded", TransportMgr(), DocumentDBConfig::SP(makeBaseConfigSnapshot(f1.transport()))) { - DocumentDBConfig::SP fullCfg = addConfigsThatAreNotSavedToDisk(*f); - saveBaseConfigSnapshot(*fullCfg, 20); + DocumentDBConfig::SP fullCfg = addConfigsThatAreNotSavedToDisk(*f2); + saveBaseConfigSnapshot(f1.transport(), *fullCfg, 20); DocumentDBConfig::SP esnap(makeEmptyConfigSnapshot()); { - FileConfigManager cm("out", myId, "dummy"); + FileConfigManager cm(f1.transport(), "out", myId, "dummy"); cm.loadConfig(*esnap, 20, esnap); } - assertEqualSnapshot(*f, *esnap); + assertEqualSnapshot(*f2, *esnap); } -TEST_F("requireThatConfigCanBeSerializedAndDeserialized", DocumentDBConfig::SP(makeBaseConfigSnapshot())) +TEST_FF("requireThatConfigCanBeSerializedAndDeserialized", TransportMgr(), DocumentDBConfig::SP(makeBaseConfigSnapshot(f1.transport()))) { - saveBaseConfigSnapshot(*f, 30); + saveBaseConfigSnapshot(f1.transport(), *f2, 30); nbostream stream; { - FileConfigManager cm("out", myId, "dummy"); + FileConfigManager cm(f1.transport(), "out", myId, "dummy"); cm.serializeConfig(30, stream); } { - FileConfigManager cm("out", myId, "dummy"); + FileConfigManager cm(f1.transport(), "out", myId, "dummy"); cm.deserializeConfig(40, stream); } DocumentDBConfig::SP fsnap(makeEmptyConfigSnapshot()); { - FileConfigManager cm("out", myId, "dummy"); + FileConfigManager cm(f1.transport(), "out", myId, "dummy"); cm.loadConfig(*fsnap, 40, fsnap); } - assertEqualSnapshot(*f, *fsnap); + assertEqualSnapshot(*f2, *fsnap); EXPECT_EQUAL("dummy", fsnap->getDocTypeName()); } -TEST_F("requireThatConfigCanBeLoadedWithoutExtraConfigsDataFile", DocumentDBConfig::SP(makeBaseConfigSnapshot())) +TEST_FF("requireThatConfigCanBeLoadedWithoutExtraConfigsDataFile", TransportMgr(), DocumentDBConfig::SP(makeBaseConfigSnapshot(f1.transport()))) { - saveBaseConfigSnapshot(*f, 70); + saveBaseConfigSnapshot(f1.transport(), *f2, 70); EXPECT_FALSE(vespalib::unlink("out/config-70/extraconfigs.dat")); DocumentDBConfig::SP esnap(makeEmptyConfigSnapshot()); { - FileConfigManager cm("out", myId, "dummy"); + FileConfigManager cm(f1.transport(), "out", myId, "dummy"); cm.loadConfig(*esnap, 70, esnap); } } -TEST_F("requireThatVisibilityDelayIsPropagated", DocumentDBConfig::SP(makeBaseConfigSnapshot())) +TEST_FF("requireThatVisibilityDelayIsPropagated", TransportMgr(), DocumentDBConfig::SP(makeBaseConfigSnapshot(f1.transport()))) { - saveBaseConfigSnapshot(*f, 80); + saveBaseConfigSnapshot(f1.transport(), *f2, 80); DocumentDBConfig::SP esnap(makeEmptyConfigSnapshot()); { ProtonConfigBuilder protonConfigBuilder; @@ -182,7 +184,7 @@ TEST_F("requireThatVisibilityDelayIsPropagated", DocumentDBConfig::SP(makeBaseCo ddb.visibilitydelay = 61.0; protonConfigBuilder.documentdb.push_back(ddb); protonConfigBuilder.maxvisibilitydelay = 100.0; - FileConfigManager cm("out", myId, "dummy"); + FileConfigManager cm(f1.transport(), "out", myId, "dummy"); cm.setProtonConfig(std::make_shared(protonConfigBuilder)); cm.loadConfig(*esnap, 70, esnap); } diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 9657619be40..39a5c69376c 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -348,6 +349,7 @@ public: test::DiskMemUsageNotifier _diskMemUsageNotifier; BucketCreateNotifier _bucketCreateNotifier; MonitoredRefCount _refCount; + TransportMgr _transport; MaintenanceController _mc; MaintenanceControllerFixture(); @@ -767,7 +769,8 @@ MaintenanceControllerFixture::MaintenanceControllerFixture() _attributeUsageFilter(), _bucketCreateNotifier(), _refCount(), - _mc(_threadService, _genericExecutor, _refCount, _docTypeName) + _transport(), + _mc(_transport.transport(), _threadService, _genericExecutor, _refCount, _docTypeName) { std::vector subDBs; subDBs.push_back(&_ready); diff --git a/searchcore/src/tests/proton/proton_config_fetcher/CMakeLists.txt b/searchcore/src/tests/proton/proton_config_fetcher/CMakeLists.txt index 88332f360ce..20629733f09 100644 --- a/searchcore/src/tests/proton/proton_config_fetcher/CMakeLists.txt +++ b/searchcore/src/tests/proton/proton_config_fetcher/CMakeLists.txt @@ -3,6 +3,7 @@ vespa_add_executable(searchcore_proton_config_fetcher_test_app TEST SOURCES proton_config_fetcher_test.cpp DEPENDS + searchcore_test searchcore_server searchcore_fconfig ) diff --git a/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp b/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp index 3c9e9fc3a64..b4193b0e0b2 100644 --- a/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp +++ b/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -17,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -63,6 +65,7 @@ struct DoctypeFixture { struct ConfigTestFixture { const std::string configId; + TransportMgr transport; ProtonConfigBuilder protonBuilder; DocumenttypesConfigBuilder documenttypesBuilder; FiledistributorrpcConfigBuilder filedistBuilder; @@ -90,6 +93,8 @@ struct ConfigTestFixture { addDocType("_alwaysthere_"); } + ~ConfigTestFixture() = default; + DoctypeFixture *addDocType(const std::string &name, bool isGlobal = false) { DocumenttypesConfigBuilder::Documenttype dt; dt.bodystruct = -1270491200; @@ -249,7 +254,7 @@ getDocumentDBConfig(ConfigTestFixture &f, DocumentDBConfigManager &mgr, const Hw { ConfigRetriever retriever(mgr.createConfigKeySet(), f.context); mgr.forwardConfig(f.getBootstrapConfig(1, hwInfo)); - mgr.update(retriever.getBootstrapConfigs()); // Cheating, but we only need the configs + mgr.update(f.transport.transport(), retriever.getBootstrapConfigs()); // Cheating, but we only need the configs return mgr.getConfig(); } @@ -300,8 +305,8 @@ TEST_FF("require that documentdb config manager builds schema with imported attr TEST_FFF("require that proton config fetcher follows changes to bootstrap", ConfigTestFixture("search"), ProtonConfigOwner(), - ProtonConfigFetcher(ConfigUri(f1.configId, f1.context), f2, 60s)) { - f3.start(); + ProtonConfigFetcher(f1.transport.transport(), ConfigUri(f1.configId, f1.context), f2, 60s)) { + f3.start(f1.transport.threadPool()); ASSERT_TRUE(f2._configured); ASSERT_TRUE(f1.configEqual(f2.getBootstrapConfig())); f2._configured = false; @@ -315,8 +320,8 @@ TEST_FFF("require that proton config fetcher follows changes to bootstrap", TEST_FFF("require that proton config fetcher follows changes to doctypes", ConfigTestFixture("search"), ProtonConfigOwner(), - ProtonConfigFetcher(ConfigUri(f1.configId, f1.context), f2, 60s)) { - f3.start(); + ProtonConfigFetcher(f1.transport.transport(), ConfigUri(f1.configId, f1.context), f2, 60s)) { + f3.start(f1.transport.threadPool()); f2._configured = false; f1.addDocType("typea"); @@ -335,8 +340,8 @@ TEST_FFF("require that proton config fetcher follows changes to doctypes", TEST_FFF("require that proton config fetcher reconfigures dbowners", ConfigTestFixture("search"), ProtonConfigOwner(), - ProtonConfigFetcher(ConfigUri(f1.configId, f1.context), f2, 60s)) { - f3.start(); + ProtonConfigFetcher(f1.transport.transport(), ConfigUri(f1.configId, f1.context), f2, 60s)) { + f3.start(f1.transport.threadPool()); ASSERT_FALSE(f2.getDocumentDBConfig("typea")); // Add db and verify that config for db is provided diff --git a/searchcore/src/tests/proton/proton_disk_layout/CMakeLists.txt b/searchcore/src/tests/proton/proton_disk_layout/CMakeLists.txt index 1c22818630c..5b6c84393f7 100644 --- a/searchcore/src/tests/proton/proton_disk_layout/CMakeLists.txt +++ b/searchcore/src/tests/proton/proton_disk_layout/CMakeLists.txt @@ -3,6 +3,7 @@ vespa_add_executable(searchcore_proton_disk_layout_test_app TEST SOURCES proton_disk_layout_test.cpp DEPENDS + searchcore_test searchcore_server searchcore_fconfig ) diff --git a/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp b/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp index 17bd5991b68..08b9a55746b 100644 --- a/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp +++ b/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -15,6 +16,7 @@ using search::transactionlog::client::TransLogClient; using search::transactionlog::TransLogServer; using proton::DocTypeName; using proton::ProtonDiskLayout; +using proton::TransportMgr; static constexpr unsigned int tlsPort = 9018; @@ -29,6 +31,7 @@ struct FixtureBase struct DiskLayoutFixture { DummyFileHeaderContext _fileHeaderContext; + TransportMgr _transport; TransLogServer _tls; vespalib::string _tlsSpec; ProtonDiskLayout _diskLayout; @@ -91,7 +94,8 @@ struct DiskLayoutFixture { DiskLayoutFixture::DiskLayoutFixture() : _fileHeaderContext(), - _tls("tls", tlsPort, baseDir, _fileHeaderContext), + _transport(), + _tls(_transport.transport(), "tls", tlsPort, baseDir, _fileHeaderContext), _tlsSpec(vespalib::make_string("tcp/localhost:%u", tlsPort)), _diskLayout(baseDir, _tlsSpec) { diff --git a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/CMakeLists.txt b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/CMakeLists.txt index 02b3483153d..0b126ccff27 100644 --- a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/CMakeLists.txt +++ b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/CMakeLists.txt @@ -3,6 +3,7 @@ vespa_add_executable(searchcore_disk_mem_usage_sampler_test_app TEST SOURCES disk_mem_usage_sampler_test.cpp DEPENDS + searchcore_test searchcore_server GTest::GTest ) diff --git a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp index 0c80553e1e7..9469c8b055f 100644 --- a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp +++ b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp @@ -3,8 +3,8 @@ #include #include #include +#include #include -#include #include #include @@ -38,16 +38,19 @@ public: }; struct DiskMemUsageSamplerTest : public ::testing::Test { - DiskMemUsageSampler sampler; - DiskMemUsageSamplerTest(): - sampler(".", - DiskMemUsageSampler::Config(0.8, 0.8, - 50ms, make_hw_info())) + TransportMgr transport; + std::unique_ptr sampler; + DiskMemUsageSamplerTest() + : transport(), + sampler(std::make_unique(transport.transport(), ".", DiskMemUsageSampler::Config(0.8, 0.8, 50ms, make_hw_info()))) { - sampler.add_transient_usage_provider(std::make_shared(50, 200)); - sampler.add_transient_usage_provider(std::make_shared(100, 150)); + sampler->add_transient_usage_provider(std::make_shared(50, 200)); + sampler->add_transient_usage_provider(std::make_shared(100, 150)); } - const DiskMemUsageFilter& filter() const { return sampler.writeFilter(); } + ~DiskMemUsageSamplerTest() { + sampler.reset(); + } + const DiskMemUsageFilter& filter() const { return sampler->writeFilter(); } }; TEST_F(DiskMemUsageSamplerTest, resource_usage_is_sampled) diff --git a/searchcore/src/tests/proton/server/shared_threading_service/CMakeLists.txt b/searchcore/src/tests/proton/server/shared_threading_service/CMakeLists.txt index 9b40ae19c99..61325ccb125 100644 --- a/searchcore/src/tests/proton/server/shared_threading_service/CMakeLists.txt +++ b/searchcore/src/tests/proton/server/shared_threading_service/CMakeLists.txt @@ -3,6 +3,7 @@ vespa_add_executable(searchcore_shared_threading_service_test_app TEST SOURCES shared_threading_service_test.cpp DEPENDS + searchcore_test searchcore_server GTest::GTest ) diff --git a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp index bede9f967a4..1b8c8d8491b 100644 --- a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp +++ b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -47,14 +48,16 @@ TEST(SharedThreadingServiceConfigTest, shared_threads_are_derived_from_cpu_cores class SharedThreadingServiceTest : public ::testing::Test { public: + TransportMgr transport; std::unique_ptr service; SharedThreadingServiceTest() - : service() - { - } + : transport(), + service() + { } + ~SharedThreadingServiceTest() = default; void setup(double concurrency, uint32_t cpu_cores) { service = std::make_unique( - SharedThreadingServiceConfig::make(make_proton_config(concurrency), HwInfo::Cpu(cpu_cores))); + SharedThreadingServiceConfig::make(make_proton_config(concurrency), HwInfo::Cpu(cpu_cores)), transport.transport()); } SequencedTaskExecutor* field_writer() { return dynamic_cast(service->field_writer()); diff --git a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt index ac9ae5519f7..60d3a05502a 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt @@ -32,6 +32,7 @@ vespa_add_library(searchcore_bmcluster STATIC storage_api_rpc_bm_feed_handler.cpp storage_reply_error_checker.cpp DEPENDS + searchcore_test searchcore_server searchcore_initializer searchcore_reprocessing diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp index 3587e8008f2..85fd304f395 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp @@ -449,7 +449,6 @@ class MyBmNode : public BmNode int _distributor_mbus_port; int _distributor_rpc_port; int _distributor_status_port; - TransLogServer _tls; vespalib::string _tls_spec; proton::matching::QueryLimiter _query_limiter; vespalib::Clock _clock; @@ -457,6 +456,7 @@ class MyBmNode : public BmNode proton::MemoryConfigStores _config_stores; vespalib::ThreadStackExecutor _summary_executor; proton::MockSharedThreadingService _shared_service; + TransLogServer _tls; proton::DummyDBOwner _document_db_owner; BucketSpace _bucket_space; std::shared_ptr _document_db; @@ -514,7 +514,6 @@ MyBmNode::MyBmNode(const vespalib::string& base_dir, int base_port, uint32_t nod _distributor_mbus_port(port_number(base_port, PortBias::DISTRIBUTOR_MBUS_PORT)), _distributor_rpc_port(port_number(base_port, PortBias::DISTRIBUTOR_RPC_PORT)), _distributor_status_port(port_number(base_port, PortBias::DISTRIBUTOR_STATUS_PORT)), - _tls("tls", _tls_listen_port, _base_dir, _file_header_context), _tls_spec(vespalib::make_string("tcp/localhost:%d", _tls_listen_port)), _query_limiter(), _clock(), @@ -522,6 +521,7 @@ MyBmNode::MyBmNode(const vespalib::string& base_dir, int base_port, uint32_t nod _config_stores(), _summary_executor(8, 128_Ki), _shared_service(_summary_executor, _summary_executor), + _tls(_shared_service.transport(), "tls", _tls_listen_port, _base_dir, _file_header_context), _document_db_owner(), _bucket_space(document::test::makeBucketSpace(_doc_type_name.getName())), _document_db(), @@ -568,7 +568,7 @@ MyBmNode::create_document_db(const BmClusterParams& params) vespalib::mkdir(_base_dir + "/" + _doc_type_name.getName(), false); vespalib::string input_cfg = _base_dir + "/" + _doc_type_name.getName() + "/baseconfig"; { - proton::FileConfigManager fileCfg(input_cfg, "", _doc_type_name.getName()); + proton::FileConfigManager fileCfg(_shared_service.transport(), input_cfg, "", _doc_type_name.getName()); fileCfg.saveConfig(*_document_db_config, 1); } config::DirSpec spec(input_cfg + "/config-1"); @@ -590,7 +590,7 @@ MyBmNode::create_document_db(const BmClusterParams& params) std::make_shared(), tuneFileDocDB, HwInfo()); mgr.forwardConfig(bootstrap_config); - mgr.nextGeneration(0ms); + mgr.nextGeneration(_shared_service.transport(), 0ms); _document_db = DocumentDB::create(_base_dir, mgr.getConfig(), _tls_spec, _query_limiter, _clock, _doc_type_name, _bucket_space, *bootstrap_config->getProtonConfigSP(), _document_db_owner, _shared_service, *_persistence_engine, _tls, diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp index 2dc749ce26b..7054c7077c8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.cpp @@ -10,12 +10,12 @@ using vespalib::makeLambdaTask; namespace proton { -DiskMemUsageSampler::DiskMemUsageSampler(const std::string &path_in, const Config &config) +DiskMemUsageSampler::DiskMemUsageSampler(FNET_Transport & transport, const std::string &path_in, const Config &config) : _filter(config.hwInfo), _path(path_in), _sampleInterval(60s), _lastSampleTime(vespalib::steady_clock::now()), - _periodicTimer(), + _periodicTimer(std::make_unique(transport)), _lock(), _transient_usage_providers() { @@ -30,12 +30,11 @@ DiskMemUsageSampler::~DiskMemUsageSampler() void DiskMemUsageSampler::setConfig(const Config &config) { - _periodicTimer.reset(); + _periodicTimer->reset(); _filter.setConfig(config.filterConfig); _sampleInterval = config.sampleInterval; sampleUsage(); _lastSampleTime = vespalib::steady_clock::now(); - _periodicTimer = std::make_unique(); vespalib::duration maxInterval = std::min(vespalib::duration(1s), _sampleInterval); _periodicTimer->scheduleAtFixedRate(makeLambdaTask([this]() { if (_filter.acceptWriteOperation() && (vespalib::steady_clock::now() < (_lastSampleTime + _sampleInterval))) { diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h index 74e14bc65de..7475282e718 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h @@ -5,6 +5,8 @@ #include #include "disk_mem_usage_filter.h" +class FNET_Transport; + namespace vespalib { class ScheduledExecutor; } namespace proton { @@ -51,7 +53,8 @@ public: } }; - DiskMemUsageSampler(const std::string &path_in, + DiskMemUsageSampler(FNET_Transport & transport, + const std::string &path_in, const Config &config); ~DiskMemUsageSampler(); diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index a30aa916896..078e5a69fb3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -15,7 +15,6 @@ #include "replay_throttling_policy.h" #include #include -#include #include #include #include @@ -220,7 +219,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _feedHandler(std::make_unique(_writeService, tlsSpec, docTypeName, *this, _writeFilter, *this, tlsWriterFactory)), _subDBs(*this, *this, *_feedHandler, _docTypeName, _writeService, shared_service.warmup(), fileHeaderContext, metricsWireService, getMetrics(), queryLimiter, clock, _configMutex, _baseDir, hwInfo), - _maintenanceController(_writeService.master(), shared_service.shared(), _refCount, _docTypeName), + _maintenanceController(shared_service.transport(), _writeService.master(), shared_service.shared(), _refCount, _docTypeName), _jobTrackers(), _calc(), _metricsUpdater(_subDBs, _writeService, _jobTrackers, *_sessionManager, _writeFilter, *_feedHandler) diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp index b7c319d46f5..54561ec6304 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp @@ -272,18 +272,19 @@ build_alloc_config(const ProtonConfig& proton_config, const vespalib::string& do distribution_config.redundancy, distribution_config.searchablecopies); } -vespalib::string resolve_file(config::RpcFileAcquirer &fileAcquirer, vespalib::TimeBox &timeBox, - const vespalib::string &desc, const vespalib::string &fileref) +vespalib::string +resolve_file(config::RpcFileAcquirer &fileAcquirer, vespalib::TimeBox &timeBox, + const vespalib::string &desc, const vespalib::string &fileref) { vespalib::string filePath; - LOG(info, "Waiting for file acquirer (%s, ref='%s')", desc.c_str(), fileref.c_str()); + LOG(debug, "Waiting for file acquirer (%s, ref='%s')", desc.c_str(), fileref.c_str()); while (timeBox.hasTimeLeft() && (filePath == "")) { filePath = fileAcquirer.wait_for(fileref, timeBox.timeLeft()); if (filePath == "") { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(100ms); } } - LOG(info, "Got file path from file acquirer: '%s' (%s, ref='%s')", filePath.c_str(), desc.c_str(), fileref.c_str()); + LOG(debug, "Got file path from file acquirer: '%s' (%s, ref='%s')", filePath.c_str(), desc.c_str(), fileref.c_str()); if (filePath == "") { throw config::ConfigTimeoutException(fmt("could not get file path from file acquirer for %s (ref=%s)", desc.c_str(), fileref.c_str())); @@ -294,7 +295,7 @@ vespalib::string resolve_file(config::RpcFileAcquirer &fileAcquirer, vespalib::T } void -DocumentDBConfigManager::update(const ConfigSnapshot &snapshot) +DocumentDBConfigManager::update(FNET_Transport & transport, const ConfigSnapshot &snapshot) { using RankProfilesConfigSP = DocumentDBConfig::RankProfilesConfigSP; using RankingConstantsConfigSP = std::shared_ptr; @@ -355,7 +356,7 @@ DocumentDBConfigManager::update(const ConfigSnapshot &snapshot) const vespalib::string &spec = _bootstrapConfig->getFiledistributorrpcConfig().connectionspec; RankingConstants::Vector constants; if (spec != "") { - config::RpcFileAcquirer fileAcquirer(spec); + config::RpcFileAcquirer fileAcquirer(transport, spec); vespalib::TimeBox timeBox(5*60, 5); for (const RankingConstantsConfig::Constant &rc : newRankingConstantsConfig->constant) { auto desc = fmt("name='%s', type='%s'", rc.name.c_str(), rc.type.c_str()); @@ -371,7 +372,7 @@ DocumentDBConfigManager::update(const ConfigSnapshot &snapshot) const vespalib::string &spec = _bootstrapConfig->getFiledistributorrpcConfig().connectionspec; RankingExpressions expressions; if (spec != "") { - config::RpcFileAcquirer fileAcquirer(spec); + config::RpcFileAcquirer fileAcquirer(transport, spec); vespalib::TimeBox timeBox(5*60, 5); for (const RankingExpressionsConfig::Expression &rc : newRankingExpressionsConfig->expression) { auto desc = fmt("name='%s'", rc.name.c_str()); @@ -387,7 +388,7 @@ DocumentDBConfigManager::update(const ConfigSnapshot &snapshot) const vespalib::string &spec = _bootstrapConfig->getFiledistributorrpcConfig().connectionspec; OnnxModels::Vector models; if (spec != "") { - config::RpcFileAcquirer fileAcquirer(spec); + config::RpcFileAcquirer fileAcquirer(transport, spec); vespalib::TimeBox timeBox(5*60, 5); for (const OnnxModelsConfig::Model &rc : newOnnxModelsConfig->model) { auto desc = fmt("name='%s'", rc.name.c_str()); @@ -496,12 +497,12 @@ DocumentDBConfigHelper::DocumentDBConfigHelper(const DirSpec &spec, const vespal DocumentDBConfigHelper::~DocumentDBConfigHelper() = default; bool -DocumentDBConfigHelper::nextGeneration(std::chrono::milliseconds timeoutInMillis) +DocumentDBConfigHelper::nextGeneration(FNET_Transport & transport, vespalib::duration timeout) { - ConfigSnapshot snapshot(_retriever->getBootstrapConfigs(timeoutInMillis)); + ConfigSnapshot snapshot(_retriever->getBootstrapConfigs(timeout)); if (snapshot.empty()) return false; - _mgr.update(snapshot); + _mgr.update(transport, snapshot); return true; } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.h b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.h index ad5959d551b..74f58bca9a9 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.h @@ -5,6 +5,8 @@ #include "documentdbconfig.h" #include +class FNET_Transport; + namespace config { class ConfigRetriever; class DirSpec; @@ -38,7 +40,7 @@ private: public: DocumentDBConfigManager(const vespalib::string &configId, const vespalib::string &docTypeName); ~DocumentDBConfigManager(); - void update(const config::ConfigSnapshot & snapshot); + void update(FNET_Transport & transport, const config::ConfigSnapshot & snapshot); DocumentDBConfig::SP getConfig() const; @@ -56,7 +58,7 @@ public: DocumentDBConfigHelper(const config::DirSpec &spec, const vespalib::string &docTypeName); ~DocumentDBConfigHelper(); - bool nextGeneration(std::chrono::milliseconds timeoutInMillis); + bool nextGeneration(FNET_Transport & transport, vespalib::duration timeout); DocumentDBConfig::SP getConfig() const; void forwardConfig(const std::shared_ptr & config); private: diff --git a/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.cpp index 34369a0803e..8ffa6c0a84d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.cpp @@ -207,10 +207,12 @@ getFileList(const vespalib::string &snapDir) } -FileConfigManager::FileConfigManager(const vespalib::string &baseDir, +FileConfigManager::FileConfigManager(FNET_Transport & transport, + const vespalib::string &baseDir, const vespalib::string &configId, const vespalib::string &docTypeName) - : _baseDir(baseDir), + : _transport(transport), + _baseDir(baseDir), _configId(configId), _docTypeName(docTypeName), _info(baseDir), @@ -358,7 +360,7 @@ FileConfigManager::loadConfig(const DocumentDBConfig ¤tSnapshot, search::S bucketspaces,currentSnapshot.getTuneFileDocumentDBSP(), sampler.hwInfo()); dbc.forwardConfig(bootstrap); - dbc.nextGeneration(0ms); + dbc.nextGeneration(_transport, 0ms); loadedSnapshot = dbc.getConfig(); loadedSnapshot->setConfigId(_configId); diff --git a/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.h b/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.h index 417ab6cf394..cca68d7d84c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.h +++ b/searchcore/src/vespa/searchcore/proton/server/fileconfigmanager.h @@ -11,11 +11,12 @@ namespace proton { class FileConfigManager : public ConfigStore { private: - vespalib::string _baseDir; - vespalib::string _configId; - vespalib::string _docTypeName; - search::IndexMetaInfo _info; - ProtonConfigSP _protonConfig; + FNET_Transport & _transport; + vespalib::string _baseDir; + vespalib::string _configId; + vespalib::string _docTypeName; + search::IndexMetaInfo _info; + ProtonConfigSP _protonConfig; public: /** @@ -24,9 +25,8 @@ public: * @param baseDir the directory in which config snapshots are saved and loaded. * @param configId the configId that was used to subscribe to config that is later handled by this manager. */ - FileConfigManager(const vespalib::string &baseDir, - const vespalib::string &configId, - const vespalib::string &docTypeName); + FileConfigManager(FNET_Transport & transport, const vespalib::string &baseDir, + const vespalib::string &configId, const vespalib::string &docTypeName); ~FileConfigManager() override; diff --git a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h index dfa48cb8d1a..a467f9bc320 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h @@ -1,6 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +class FNET_Transport; + namespace vespalib { class ISequencedTaskExecutor; class ThreadExecutor; @@ -14,7 +16,7 @@ namespace proton { */ class ISharedThreadingService { public: - virtual ~ISharedThreadingService() {} + virtual ~ISharedThreadingService() = default; /** * Returns the executor used for warmup (e.g. index warmup). @@ -44,6 +46,11 @@ public: * Returns an InvokeService intended for regular wakeup calls. */ virtual vespalib::InvokeService & invokeService() = 0; + + /** + * Returns a shared transport object that can be utilized by multiple services. + */ + virtual FNET_Transport & transport() = 0; }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp index fa4bae8f01b..0df211b5a0b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -39,7 +40,8 @@ isRunnable(const MaintenanceJobRunner & job, const Executor * master) { } -MaintenanceController::MaintenanceController(ISyncableThreadService& masterThread, +MaintenanceController::MaintenanceController(FNET_Transport & transport, + ISyncableThreadService& masterThread, vespalib::Executor& shared_executor, MonitoredRefCount& refCount, const DocTypeName& docTypeName) @@ -49,7 +51,7 @@ MaintenanceController::MaintenanceController(ISyncableThreadService& masterThrea _readySubDB(), _remSubDB(), _notReadySubDB(), - _periodicTimer(), + _periodicTimer(std::make_unique(transport)), _config(), _state(State::INITIALIZING), _docTypeName(docTypeName), @@ -93,7 +95,7 @@ MaintenanceController::killJobs() // Called by master write thread assert(_masterThread.isCurrentThread()); LOG(debug, "killJobs(): threadId=%zu", (size_t)FastOS_Thread::GetCurrentThreadId()); - _periodicTimer.reset(); + _periodicTimer->reset(); // No need to take _jobsLock as modification of _jobs also happens in master write thread. for (auto &job : _jobs) { job->stop(); // Make sure no more tasks are added to the executor @@ -172,7 +174,7 @@ MaintenanceController::restart() if (!getStarted() || getStopping() || !_readySubDB.valid()) { return; } - _periodicTimer = std::make_unique(); + _periodicTimer->reset(); addJobsToPeriodicTimer(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h index 086f5a36404..8e5bb8d860c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h @@ -42,7 +42,7 @@ public: using UP = std::unique_ptr; enum class State {INITIALIZING, STARTED, PAUSED, STOPPING}; - MaintenanceController(ISyncableThreadService& masterThread, vespalib::Executor& shared_executor, + MaintenanceController(FNET_Transport & transport, ISyncableThreadService& masterThread, vespalib::Executor& shared_executor, vespalib::MonitoredRefCount& refCount, const DocTypeName& docTypeName); ~MaintenanceController(); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 642f8746587..3aaf00a6541 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -43,6 +43,7 @@ #include #include #include +#include #ifdef __linux__ #include #endif @@ -202,7 +203,7 @@ Proton::ProtonFileHeaderContext::setClusterName(const vespalib::string & cluster Proton::Proton(const config::ConfigUri & configUri, const vespalib::string &progName, - std::chrono::milliseconds subscribeTimeout) + vespalib::duration subscribeTimeout) : IProtonConfigurerOwner(), search::engine::MonitorServer(), IDocumentDBOwner(), @@ -210,6 +211,8 @@ Proton::Proton(const config::ConfigUri & configUri, IPersistenceEngineOwner(), ComponentConfigProducer(), _cpu_util(), + _threadPool(std::make_unique(128_Ki)), + _transport(std::make_unique(TransportConfig(1))), _configUri(configUri), _mutex(), _metricsHook(std::make_unique(*this)), @@ -235,12 +238,11 @@ Proton::Proton(const config::ConfigUri & configUri, _executor(1, 128_Ki), _protonDiskLayout(), _protonConfigurer(_executor, *this, _protonDiskLayout), - _protonConfigFetcher(configUri, _protonConfigurer, subscribeTimeout), + _protonConfigFetcher(*_transport, configUri, _protonConfigurer, subscribeTimeout), _shared_service(), _compile_cache_executor_binding(), _queryLimiter(), _clock(1.0/vespalib::getVespaTimerHz()), - _threadPool(128_Ki), _distributionKey(-1), _isInitializing(true), _abortInit(false), @@ -258,10 +260,11 @@ Proton::init() { assert( ! _initStarted && ! _initComplete ); _initStarted = true; - if (_threadPool.NewThread(_clock.getRunnable(), nullptr) == nullptr) { + _transport->Start(_threadPool.get()); + if (_threadPool->NewThread(_clock.getRunnable(), nullptr) == nullptr) { throw IllegalStateException("Failed starting thread for the cheap clock"); } - _protonConfigFetcher.start(); + _protonConfigFetcher.start(*_threadPool); auto configSnapshot = _protonConfigurer.getPendingConfigSnapshot(); assert(configSnapshot); auto bootstrapConfig = configSnapshot->getBootstrapConfig(); @@ -279,7 +282,8 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) setBucketCheckSumType(protonConfig); setFS4Compression(protonConfig); - _diskMemUsageSampler = std::make_unique(protonConfig.basedir, + _shared_service = std::make_unique(SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu()), *_transport); + _diskMemUsageSampler = std::make_unique(_shared_service->transport(), protonConfig.basedir, diskMemUsageSamplerConfig(protonConfig, hwInfo)); _tls = std::make_unique(_configUri.createWithNewId(protonConfig.tlsconfigid), _fileHeaderContext); @@ -313,7 +317,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _protonDiskLayout = std::make_unique(protonConfig.basedir, protonConfig.tlsspec); vespalib::chdir(protonConfig.basedir); vespalib::alloc::MmapFileAllocatorFactory::instance().setup(protonConfig.basedir + "/swapdirs"); - _tls->start(hwInfo.cpu().cores()); + _tls->start(*_transport, hwInfo.cpu().cores()); _flushEngine = std::make_unique(std::make_shared(_tls->getTransLogServer()), strategy, flush.maxconcurrent, vespalib::from_s(flush.idleinterval)); _metricsEngine->addExternalMetrics(_summaryEngine->getMetrics()); @@ -328,7 +332,6 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) protonConfig.visit.ignoremaxbytes); vespalib::string fileConfigId; - _shared_service = std::make_unique(SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu())); _compile_cache_executor_binding = vespalib::eval::CompileCache::bind(_shared_service->shared_raw()); InitializeThreads initializeThreads; if (protonConfig.initialize.threads > 0) { @@ -476,6 +479,7 @@ Proton::~Proton() _compile_cache_executor_binding.reset(); _shared_service.reset(); _clock.stop(); + _transport->ShutDown(true); LOG(debug, "Explicit destructor done"); } @@ -598,9 +602,8 @@ Proton::addDocumentDB(const document::DocumentType &docType, vespalib::string db_dir = config.basedir + "/documents/" + docTypeName.toString(); vespalib::mkdir(db_dir, false); // Assume parent is created. - auto config_store = std::make_unique(db_dir + "/config", - documentDBConfig->getConfigId(), - docTypeName.getName()); + auto config_store = std::make_unique(*_transport, db_dir + "/config", + documentDBConfig->getConfigId(), docTypeName.getName()); config_store->setProtonConfig(bootstrapConfig->getProtonConfigSP()); if (!initializeThreads) { // If configured value for initialize threads was 0, or we diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h index 61e381cfebb..0490b1e00b7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton.h @@ -82,6 +82,8 @@ private: }; vespalib::CpuUtil _cpu_util; + std::unique_ptr _threadPool; + std::unique_ptr _transport; const config::ConfigUri _configUri; mutable std::shared_mutex _mutex; std::unique_ptr _metricsHook; @@ -110,7 +112,6 @@ private: vespalib::eval::CompileCache::ExecutorBinding::UP _compile_cache_executor_binding; matching::QueryLimiter _queryLimiter; vespalib::Clock _clock; - FastOS_ThreadPool _threadPool; uint32_t _distributionKey; bool _isInitializing; bool _abortInit; @@ -146,7 +147,7 @@ public: Proton(const config::ConfigUri & configUri, const vespalib::string &progName, - std::chrono::milliseconds subscribeTimeout); + vespalib::duration subscribeTimeout); ~Proton() override; /** @@ -182,7 +183,7 @@ public: const std::shared_ptr &documentDBConfig, InitializeThreads initializeThreads); metrics::MetricManager & getMetricManager(); - FastOS_ThreadPool & getThreadPool() { return _threadPool; } + FastOS_ThreadPool & getThreadPool() { return *_threadPool; } bool triggerFlush(); bool prepareRestart(); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp index 485f14b8fb9..f956a370ffa 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp @@ -20,13 +20,13 @@ using namespace std::chrono_literals; namespace proton { -ProtonConfigFetcher::ProtonConfigFetcher(const config::ConfigUri & configUri, IProtonConfigurer &owner, std::chrono::milliseconds subscribeTimeout) - : _bootstrapConfigManager(configUri.getConfigId()), +ProtonConfigFetcher::ProtonConfigFetcher(FNET_Transport & transport, const config::ConfigUri & configUri, IProtonConfigurer &owner, vespalib::duration subscribeTimeout) + : _transport(transport), + _bootstrapConfigManager(configUri.getConfigId()), _retriever(_bootstrapConfigManager.createConfigKeySet(), configUri.getContext(), subscribeTimeout), _owner(owner), _mutex(), _dbManagerMap(), - _threadPool(128_Ki, 1), _oldDocumentTypeRepos(), _currentDocumentTypeRepo() { @@ -60,8 +60,7 @@ ProtonConfigFetcher::pruneManagerMap(const BootstrapConfig::SP & config) ConfigKeySet set; lock_guard guard(_mutex); - for (size_t i = 0; i < protonConfig.documentdb.size(); i++) { - const ProtonConfig::Documentdb & ddb(protonConfig.documentdb[i]); + for (const ProtonConfig::Documentdb & ddb : protonConfig.documentdb) { DocTypeName docTypeName(ddb.inputdoctypename); LOG(debug, "Document type(%s), configid(%s)", ddb.inputdoctypename.c_str(), ddb.configid.c_str()); DocumentDBConfigManager::SP mgr; @@ -83,7 +82,7 @@ ProtonConfigFetcher::updateDocumentDBConfigs(const BootstrapConfig::SP & bootstr lock_guard guard(_mutex); for (auto & entry : _dbManagerMap) { entry.second->forwardConfig(bootstrapConfig); - entry.second->update(snapshot); + entry.second->update(_transport, snapshot); } } @@ -162,10 +161,10 @@ ProtonConfigFetcher::getGeneration() const } void -ProtonConfigFetcher::start() +ProtonConfigFetcher::start(FastOS_ThreadPool & threadPool) { fetchConfigs(); - if (_threadPool.NewThread(this, nullptr) == nullptr) { + if (threadPool.NewThread(this, nullptr) == nullptr) { throw vespalib::IllegalStateException( "Failed starting thread for proton config fetcher"); } @@ -176,7 +175,6 @@ ProtonConfigFetcher::close() { if (!_retriever.isClosed()) { _retriever.close(); - _threadPool.Close(); } } @@ -189,7 +187,7 @@ ProtonConfigFetcher::rememberDocumentTypeRepo(std::shared_ptr; - ProtonConfigFetcher(const config::ConfigUri & configUri, IProtonConfigurer &owner, std::chrono::milliseconds subscribeTimeout); + ProtonConfigFetcher(FNET_Transport & transport, const config::ConfigUri & configUri, IProtonConfigurer &owner, vespalib::duration subscribeTimeout); ~ProtonConfigFetcher() override; /** * Get the current config generation. @@ -36,7 +36,7 @@ public: /** * Start config fetcher, callbacks may come from now on. */ - void start(); + void start(FastOS_ThreadPool & threadPool); /** * Shutdown config fetcher, ensuring that no more callbacks arrive @@ -47,20 +47,19 @@ public: private: typedef std::map DBManagerMap; - using Clock = std::chrono::steady_clock; - using TimePoint = std::chrono::time_point; - using OldDocumentTypeRepo = std::pair>; + using OldDocumentTypeRepo = std::pair>; + using lock_guard = std::lock_guard; - BootstrapConfigManager _bootstrapConfigManager; - config::ConfigRetriever _retriever; - IProtonConfigurer & _owner; - mutable std::mutex _mutex; // Protects maps - using lock_guard = std::lock_guard; - DBManagerMap _dbManagerMap; + FNET_Transport & _transport; + BootstrapConfigManager _bootstrapConfigManager; + config::ConfigRetriever _retriever; + IProtonConfigurer & _owner; + + mutable std::mutex _mutex; // Protects maps + DBManagerMap _dbManagerMap; - FastOS_ThreadPool _threadPool; - std::deque _oldDocumentTypeRepos; + std::deque _oldDocumentTypeRepos; std::shared_ptr _currentDocumentTypeRepo; void fetchConfigs(); diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp index 80199c8c50c..1f06b29518b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp @@ -6,6 +6,8 @@ #include #include #include +#include +#include using vespalib::CpuUsage; @@ -17,8 +19,10 @@ namespace proton { using SharedFieldWriterExecutor = ThreadingServiceConfig::ProtonConfig::Feeding::SharedFieldWriterExecutor; -SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfig& cfg) - : _warmup(cfg.warmup_threads(), 128_Ki, CpuUsage::wrap(proton_warmup_executor, CpuUsage::Category::COMPACT)), +SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfig& cfg, FNET_Transport & transport) + : + _transport(transport), + _warmup(cfg.warmup_threads(), 128_Ki, CpuUsage::wrap(proton_warmup_executor, CpuUsage::Category::COMPACT)), _shared(std::make_shared(cfg.shared_threads(), 128_Ki, cfg.shared_task_limit(), proton_shared_executor)), _field_writer(), diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h index cd0e6d71402..349dcc2d0ce 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h @@ -16,15 +16,16 @@ namespace proton { class SharedThreadingService : public ISharedThreadingService { private: using Registration = std::unique_ptr; - vespalib::ThreadStackExecutor _warmup; + FNET_Transport & _transport; + vespalib::ThreadStackExecutor _warmup; std::shared_ptr _shared; std::unique_ptr _field_writer; - vespalib::InvokeServiceImpl _invokeService; - std::vector _invokeRegistrations; + vespalib::InvokeServiceImpl _invokeService; + std::vector _invokeRegistrations; public: - SharedThreadingService(const SharedThreadingServiceConfig& cfg); - ~SharedThreadingService(); + SharedThreadingService(const SharedThreadingServiceConfig& cfg, FNET_Transport & transport); + ~SharedThreadingService() override; std::shared_ptr shared_raw() { return _shared; } void sync_all_executors(); @@ -33,6 +34,7 @@ public: vespalib::ThreadExecutor& shared() override { return *_shared; } vespalib::ISequencedTaskExecutor* field_writer() override { return _field_writer.get(); } vespalib::InvokeService & invokeService() override { return _invokeService; } + FNET_Transport & transport() override { return _transport; } }; } diff --git a/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt index 2c5dbe0317b..e6eda87f66e 100644 --- a/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt @@ -7,8 +7,10 @@ vespa_add_library(searchcore_test STATIC clusterstatehandler.cpp documentdb_config_builder.cpp dummy_feed_view.cpp + mock_shared_threading_service.cpp userdocumentsbuilder.cpp threading_service_observer.cpp + transport_helper.cpp DEPENDS searchcore_fconfig ) diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp new file mode 100644 index 00000000000..e976bf1d726 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp @@ -0,0 +1,17 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "mock_shared_threading_service.h" + +namespace proton { + +MockSharedThreadingService::MockSharedThreadingService(ThreadExecutor& warmup_in, ThreadExecutor& shared_in) + : _warmup(warmup_in), + _shared(shared_in), + _invokeService(10ms), + _transportMgr() +{ +} + +MockSharedThreadingService::~MockSharedThreadingService() = default; + +} diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h index 74965c15cd4..d33afcbec3d 100644 --- a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h +++ b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "transport_helper.h" #include #include @@ -8,21 +9,21 @@ namespace proton { class MockSharedThreadingService : public ISharedThreadingService { private: - vespalib::ThreadExecutor& _warmup; - vespalib::ThreadExecutor& _shared; - vespalib::InvokeServiceImpl _invokeService; + using ThreadExecutor = vespalib::ThreadExecutor; + ThreadExecutor & _warmup; + ThreadExecutor & _shared; + vespalib::InvokeServiceImpl _invokeService; + TransportMgr _transportMgr; public: - MockSharedThreadingService(vespalib::ThreadExecutor& warmup_in, - vespalib::ThreadExecutor& shared_in) - : _warmup(warmup_in), - _shared(shared_in), - _invokeService(10ms) - {} - vespalib::ThreadExecutor& warmup() override { return _warmup; } - vespalib::ThreadExecutor& shared() override { return _shared; } + MockSharedThreadingService(ThreadExecutor& warmup_in, + ThreadExecutor& shared_in); + ~MockSharedThreadingService() override; + ThreadExecutor& warmup() override { return _warmup; } + ThreadExecutor& shared() override { return _shared; } vespalib::ISequencedTaskExecutor* field_writer() override { return nullptr; } vespalib::InvokeService & invokeService() override { return _invokeService; } + FNET_Transport & transport() override { return _transportMgr.transport(); } }; } diff --git a/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp b/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp new file mode 100644 index 00000000000..10623fb0726 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp @@ -0,0 +1,21 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "transport_helper.h" +#include +#include +#include + +namespace proton { + +TransportMgr::TransportMgr() + : _threadPool(std::make_unique(64_Ki)), + _transport(std::make_unique()) +{ + _transport->Start(_threadPool.get()); +} + +TransportMgr::~TransportMgr() { + _transport->ShutDown(true); +} + +} diff --git a/searchcore/src/vespa/searchcore/proton/test/transport_helper.h b/searchcore/src/vespa/searchcore/proton/test/transport_helper.h new file mode 100644 index 00000000000..09cca2dd007 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/test/transport_helper.h @@ -0,0 +1,26 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include + +class FastOS_ThreadPool; +class FNET_Transport; + +namespace proton { + +/** + * Helper class contain a FNET_Transport object for use in tests. + **/ +class TransportMgr { +private: + std::unique_ptr _threadPool; + std::unique_ptr _transport; + +public: + TransportMgr(); + ~TransportMgr(); + FNET_Transport & transport() { return *_transport; } + FastOS_ThreadPool & threadPool() { return *_threadPool; } +}; + +} -- cgit v1.2.3