diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-02-23 09:14:03 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-23 09:14:03 +0100 |
commit | 9e315f9567a20b6a5d1c9b505628c547f85cfdc9 (patch) | |
tree | da6ca17402cb5522d6fc34e1353ade85b6d5936f | |
parent | 6646b37c9c36793054e94881c52b5c675c0734fe (diff) |
Revert "Revert "Revert "Revert "Use common tranport for TlsClient""""
43 files changed, 254 insertions, 203 deletions
diff --git a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp index 40431b90e27..88f9938cc0a 100644 --- a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp +++ b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp @@ -385,7 +385,7 @@ public: _threadPool(64_Ki), _transport(), _server(_transport, _bopts.tlsName, _bopts.listenPort, _bopts.tlsDir, _fileHeader), - _client(vespalib::make_string("tcp/localhost:%d", _bopts.listenPort)) + _client(_transport, vespalib::make_string("tcp/localhost:%d", _bopts.listenPort)) { _transport.Start(&_threadPool); } diff --git a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp index b597bc18cc5..0e9eb926514 100644 --- a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp +++ b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp @@ -2,7 +2,6 @@ #include <vespa/vespalib/testkit/testapp.h> -#include <vespa/document/config/documenttypes_config_fwd.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/searchcore/proton/attribute/attribute_writer.h> #include <vespa/searchcore/proton/attribute/attributemanager.h> @@ -12,7 +11,6 @@ #include <vespa/searchcore/proton/index/indexmanager.h> #include <vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.h> #include <vespa/searchcore/proton/server/searchable_doc_subdb_configurer.h> -#include <vespa/searchcore/proton/server/executorthreadingservice.h> #include <vespa/searchcore/proton/server/fast_access_doc_subdb_configurer.h> #include <vespa/searchcore/proton/server/summaryadapter.h> #include <vespa/searchcore/proton/server/attribute_writer_factory.h> @@ -23,11 +21,13 @@ #include <vespa/searchcore/proton/test/documentdb_config_builder.h> #include <vespa/searchcore/proton/test/mock_summary_adapter.h> #include <vespa/searchcore/proton/test/mock_gid_to_lid_change_handler.h> +#include <vespa/searchcore/proton/test/transport_helper.h> #include <vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/searchlib/transactionlog/nosyncproxy.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/util/threadstackexecutor.h> using namespace config; using namespace document; @@ -85,9 +85,8 @@ ViewPtrs::~ViewPtrs() = default; struct ViewSet { IndexManagerDummyReconfigurer _reconfigurer; - DummyFileHeaderContext _fileHeaderContext; - vespalib::ThreadStackExecutor _sharedExecutor; - ExecutorThreadingService _writeService; + DummyFileHeaderContext _fileHeaderContext; + TransportAndExecutorService _service; SearchableFeedView::SerialNum serialNum; std::shared_ptr<const DocumentTypeRepo> repo; DocTypeName _docTypeName; @@ -114,8 +113,7 @@ struct ViewSet ViewSet::ViewSet() : _reconfigurer(), _fileHeaderContext(), - _sharedExecutor(1, 0x10000), - _writeService(_sharedExecutor), + _service(1), serialNum(1), repo(createRepo()), _docTypeName(DOC_TYPE), @@ -188,10 +186,10 @@ Fixture::initViewSet(ViewSet &views) using IndexConfig = proton::index::IndexConfig; auto matchers = std::make_shared<Matchers>(_clock, _queryLimiter, _constantValueRepo); auto indexMgr = make_shared<IndexManager>(BASE_DIR, IndexConfig(searchcorespi::index::WarmupConfig(), 2, 0), Schema(), 1, - views._reconfigurer, views._writeService, _summaryExecutor, + views._reconfigurer, views._service.write(), _summaryExecutor, TuneFileIndexManager(), TuneFileAttributes(), views._fileHeaderContext); auto attrMgr = make_shared<AttributeManager>(BASE_DIR, "test.subdb", TuneFileAttributes(), views._fileHeaderContext, - views._writeService.attributeFieldWriter(), views._writeService.shared(), views._hwInfo); + views._service.write().attributeFieldWriter(), views._service.write().shared(), views._hwInfo); auto summaryMgr = make_shared<SummaryManager> (_summaryExecutor, search::LogDocumentStore::Config(), search::GrowStrategy(), BASE_DIR, views._docTypeName, TuneFileSummary(), views._fileHeaderContext,views._noTlSyncer, search::IBucketizer::SP()); @@ -217,7 +215,7 @@ Fixture::initViewSet(ViewSet &views) views.repo, _pendingLidsForCommit, *views._gidToLidChangeHandler, - views._writeService), + views._service.write()), SearchableFeedView::PersistentParams(views.serialNum, views.serialNum, views._docTypeName, 0u, SubDbType::READY), FastAccessFeedView::Context(attrWriter, views._docIdLimit), @@ -274,21 +272,19 @@ MyFastAccessFeedView::~MyFastAccessFeedView() = default; struct FastAccessFixture { - vespalib::ThreadStackExecutor _sharedExecutor; - ExecutorThreadingService _writeService; - MyFastAccessFeedView _view; + TransportAndExecutorService _service; + MyFastAccessFeedView _view; FastAccessDocSubDBConfigurer _configurer; FastAccessFixture() - : _sharedExecutor(1, 0x10000), - _writeService(_sharedExecutor), - _view(_writeService), + : _service(1), + _view(_service.write()), _configurer(_view._feedView, std::make_unique<AttributeWriterFactory>(), "test") { vespalib::rmdir(BASE_DIR, true); vespalib::mkdir(BASE_DIR); } ~FastAccessFixture() { - _writeService.shutdown(); + _service.shutdown(); } }; diff --git a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp index 6e20d30fb36..27636324835 100644 --- a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp +++ b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp @@ -18,7 +18,6 @@ #include <vespa/searchcore/proton/server/fast_access_document_retriever.h> #include <vespa/searchcore/proton/server/i_document_subdb_owner.h> #include <vespa/searchcore/proton/server/igetserialnum.h> -#include <vespa/searchcore/proton/server/executorthreadingservice.h> #include <vespa/searchcore/proton/server/minimal_document_retriever.h> #include <vespa/searchcore/proton/server/searchabledocsubdb.h> #include <vespa/searchcore/proton/server/document_subdb_initializer.h> @@ -288,9 +287,8 @@ struct MyConfigSnapshot template <typename Traits> struct FixtureBase { - TransportMgr _transport; - ThreadStackExecutor _summaryExecutor; - ExecutorThreadingService _writeService; + TransportAndExecutorService _service; + typename Traits::Config _cfg; std::shared_ptr<bucketdb::BucketDBOwner> _bucketDB; BucketDBHandler _bucketDBHandler; @@ -301,15 +299,13 @@ struct FixtureBase typename Traits::SubDB _subDb; IFeedView::SP _tmpFeedView; FixtureBase() - : _transport(), - _summaryExecutor(1, 64_Ki), - _writeService(_summaryExecutor), + : _service(1), _cfg(), _bucketDB(std::make_shared<bucketdb::BucketDBOwner>()), _bucketDBHandler(*_bucketDB), - _ctx(_writeService, _bucketDB, _bucketDBHandler), + _ctx(_service.write(), _bucketDB, _bucketDBHandler), _baseSchema(), - _snapshot(std::make_unique<MyConfigSnapshot>(_transport.transport(), _baseSchema, Traits::ConfigDir::dir())), + _snapshot(std::make_unique<MyConfigSnapshot>(_service.transport(), _baseSchema, Traits::ConfigDir::dir())), _baseDir(BASE_DIR + "/" + SUB_NAME, BASE_DIR), _subDb(_cfg._cfg, _ctx._ctx), _tmpFeedView() @@ -317,8 +313,8 @@ struct FixtureBase init(); } ~FixtureBase() { - _writeService.master().execute(makeLambdaTask([this]() { _subDb.close(); })); - _writeService.shutdown(); + _service.write().master().execute(makeLambdaTask([this]() { _subDb.close(); })); + _service.shutdown(); } void setBucketStateCalculator(const std::shared_ptr<IBucketStateCalculator> & calc) { vespalib::Gate gate; @@ -327,11 +323,11 @@ struct FixtureBase } template <typename FunctionType> void runInMasterAndSync(FunctionType func) { - proton::test::runInMasterAndSync(_writeService, func); + proton::test::runInMasterAndSync(_service.write(), func); } template <typename FunctionType> void runInMaster(FunctionType func) { - proton::test::runInMaster(_writeService, func); + proton::test::runInMaster(_service.write(), func); } void init() { DocumentSubDbInitializer::SP task = @@ -349,7 +345,7 @@ struct FixtureBase runInMasterAndSync([&]() { performReconfig(serialNum, reconfigSchema, reconfigConfigDir); }); } void performReconfig(SerialNum serialNum, const Schema &reconfigSchema, const vespalib::string &reconfigConfigDir) { - auto newCfg = std::make_unique<MyConfigSnapshot>(_transport.transport(), reconfigSchema, reconfigConfigDir); + auto newCfg = std::make_unique<MyConfigSnapshot>(_service.transport(), reconfigSchema, reconfigConfigDir); DocumentDBConfig::ComparisonResult cmpResult; cmpResult.attributesChanged = true; cmpResult.documenttypesChanged = true; diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/CMakeLists.txt b/searchcore/src/tests/proton/documentdb/executor_threading_service/CMakeLists.txt index 721f2207213..ba7db5dd377 100644 --- a/searchcore/src/tests/proton/documentdb/executor_threading_service/CMakeLists.txt +++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/CMakeLists.txt @@ -3,6 +3,7 @@ vespa_add_executable(searchcore_executor_threading_service_test_app TEST SOURCES executor_threading_service_test.cpp DEPENDS + searchcore_test searchcore_server GTest::GTest ) diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp index 8d7e842bc89..934dcdb36e3 100644 --- a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp +++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp @@ -2,6 +2,7 @@ #include <vespa/searchcore/proton/server/executorthreadingservice.h> #include <vespa/searchcore/proton/server/threading_service_config.h> +#include <vespa/searchcore/proton/test/transport_helper.h> #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> @@ -20,17 +21,18 @@ to_concrete_type(ISequencedTaskExecutor& exec) class ExecutorThreadingServiceTest : public ::testing::Test { public: - vespalib::ThreadStackExecutor shared_executor; + TransportAndExecutor _transport; std::unique_ptr<ISequencedTaskExecutor> field_writer_executor; std::unique_ptr<ExecutorThreadingService> service; ExecutorThreadingServiceTest() - : shared_executor(1, 1000), + : _transport(1), field_writer_executor(SequencedTaskExecutor::create(my_field_writer_executor, 3, 200)), service() { } void setup(uint32_t indexing_threads, SharedFieldWriterExecutor shared_field_writer) { - service = std::make_unique<ExecutorThreadingService>(shared_executor, + service = std::make_unique<ExecutorThreadingService>(_transport.shared(), + _transport.transport(), field_writer_executor.get(), nullptr, ThreadingServiceConfig::make(indexing_threads, shared_field_writer)); diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index 9a8d8bad60e..0f1b4412c4f 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -22,7 +22,6 @@ #include <vespa/searchcore/proton/server/configstore.h> #include <vespa/document/util/feed_reject_helper.h> #include <vespa/searchcore/proton/server/ddbstate.h> -#include <vespa/searchcore/proton/server/executorthreadingservice.h> #include <vespa/searchcore/proton/server/feedhandler.h> #include <vespa/searchcore/proton/server/i_feed_handler_owner.h> #include <vespa/searchcore/proton/server/ireplayconfig.h> @@ -409,11 +408,9 @@ struct MyTlsWriter : TlsWriter { struct FeedHandlerFixture { DummyFileHeaderContext _fileHeaderContext; - TransportMgr _transport; + TransportAndExecutorService _service; TransLogServer tls; vespalib::string tlsSpec; - vespalib::ThreadStackExecutor sharedExecutor; - ExecutorThreadingService writeService; SchemaContext schema; MyOwner owner; MyResourceWriteFilter writeFilter; @@ -426,11 +423,9 @@ struct FeedHandlerFixture FeedHandler handler; FeedHandlerFixture() : _fileHeaderContext(), - _transport(), - tls(_transport.transport(), "mytls", 9016, "mytlsdir", _fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)), + _service(1), + tls(_service.transport(), "mytls", 9016, "mytlsdir", _fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)), tlsSpec("tcp/localhost:9016"), - sharedExecutor(1, 0x10000), - writeService(sharedExecutor), schema(), owner(), _state(), @@ -438,7 +433,7 @@ struct FeedHandlerFixture feedView(schema.getRepo(), schema.getDocType()), _bucketDB(), _bucketDBHandler(_bucketDB), - handler(writeService, tlsSpec, schema.getDocType(), owner, + handler(_service.write(), tlsSpec, schema.getDocType(), owner, writeFilter, replayConfig, tls, &tls_writer) { _state.enterLoadState(); @@ -449,15 +444,15 @@ struct FeedHandlerFixture } ~FeedHandlerFixture() { - writeService.shutdown(); + _service.shutdown(); } template <class FunctionType> inline void runAsMaster(FunctionType &&function) { - writeService.master().execute(makeLambdaTask(std::move(function))); + _service.write().master().execute(makeLambdaTask(std::move(function))); syncMaster(); } void syncMaster() { - writeService.master().sync(); + _service.write().master().sync(); } }; diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp index 824a9273404..ba68d47ec22 100644 --- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp @@ -5,7 +5,6 @@ #include <vespa/searchcore/proton/test/bucketfactory.h> #include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchcore/proton/index/i_index_writer.h> -#include <vespa/searchcore/proton/server/executorthreadingservice.h> #include <vespa/searchcore/proton/server/isummaryadapter.h> #include <vespa/searchcore/proton/server/matchview.h> #include <vespa/searchcore/proton/server/searchable_feed_view.h> @@ -20,6 +19,7 @@ #include <vespa/searchcore/proton/test/mock_summary_adapter.h> #include <vespa/searchcore/proton/test/thread_utils.h> #include <vespa/searchcore/proton/test/threading_service_observer.h> +#include <vespa/searchcore/proton/test/transport_helper.h> #include <vespa/searchlib/attribute/attributefactory.h> #include <vespa/document/update/documentupdate.h> #include <vespa/vespalib/util/destructor_callbacks.h> @@ -497,8 +497,7 @@ struct FixtureBase DocumentMetaStoreContext::SP _dmscReal; test::DocumentMetaStoreContextObserver::SP _dmsc; ParamsContext pc; - vespalib::ThreadStackExecutor _sharedExecutor; - ExecutorThreadingService _writeServiceReal; + TransportAndExecutorService _service; test::ThreadingServiceObserver _writeService; SerialNum serial; std::shared_ptr<MyGidToLidChangeHandler> _gidToLidChangeHandler; @@ -690,9 +689,8 @@ FixtureBase::FixtureBase() _dmscReal(std::make_shared<DocumentMetaStoreContext>(std::make_shared<bucketdb::BucketDBOwner>())), _dmsc(std::make_shared<test::DocumentMetaStoreContextObserver>(*_dmscReal)), pc(sc._builder->getDocumentType().getName(), "fileconfig_test"), - _sharedExecutor(1, 0x10000), - _writeServiceReal(_sharedExecutor), - _writeService(_writeServiceReal), + _service(1), + _writeService(_service.write()), serial(0), _gidToLidChangeHandler(std::make_shared<MyGidToLidChangeHandler>()) { @@ -700,7 +698,7 @@ FixtureBase::FixtureBase() } FixtureBase::~FixtureBase() { - _writeServiceReal.shutdown(); + _service.shutdown(); } void diff --git a/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp b/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp index d51ea25f2f5..2de24ee7803 100644 --- a/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp +++ b/searchcore/src/tests/proton/documentdb/fileconfigmanager/fileconfigmanager_test.cpp @@ -127,7 +127,7 @@ addConfigsThatAreNotSavedToDisk(const DocumentDBConfig &cfg) return builder.build(); } -TEST_FF("requireThatConfigCanBeSavedAndLoaded", TransportMgr(), DocumentDBConfig::SP(makeBaseConfigSnapshot(f1.transport()))) +TEST_FF("requireThatConfigCanBeSavedAndLoaded", Transport(), DocumentDBConfig::SP(makeBaseConfigSnapshot(f1.transport()))) { DocumentDBConfig::SP fullCfg = addConfigsThatAreNotSavedToDisk(*f2); @@ -140,7 +140,7 @@ TEST_FF("requireThatConfigCanBeSavedAndLoaded", TransportMgr(), DocumentDBConfig assertEqualSnapshot(*f2, *esnap); } -TEST_FF("requireThatConfigCanBeSerializedAndDeserialized", TransportMgr(), DocumentDBConfig::SP(makeBaseConfigSnapshot(f1.transport()))) +TEST_FF("requireThatConfigCanBeSerializedAndDeserialized", Transport(), DocumentDBConfig::SP(makeBaseConfigSnapshot(f1.transport()))) { saveBaseConfigSnapshot(f1.transport(), *f2, 30); nbostream stream; @@ -161,7 +161,7 @@ TEST_FF("requireThatConfigCanBeSerializedAndDeserialized", TransportMgr(), Docum EXPECT_EQUAL("dummy", fsnap->getDocTypeName()); } -TEST_FF("requireThatConfigCanBeLoadedWithoutExtraConfigsDataFile", TransportMgr(), DocumentDBConfig::SP(makeBaseConfigSnapshot(f1.transport()))) +TEST_FF("requireThatConfigCanBeLoadedWithoutExtraConfigsDataFile", Transport(), DocumentDBConfig::SP(makeBaseConfigSnapshot(f1.transport()))) { saveBaseConfigSnapshot(f1.transport(), *f2, 70); EXPECT_FALSE(vespalib::unlink("out/config-70/extraconfigs.dat")); @@ -173,7 +173,7 @@ TEST_FF("requireThatConfigCanBeLoadedWithoutExtraConfigsDataFile", TransportMgr( } -TEST_FF("requireThatVisibilityDelayIsPropagated", TransportMgr(), DocumentDBConfig::SP(makeBaseConfigSnapshot(f1.transport()))) +TEST_FF("requireThatVisibilityDelayIsPropagated", Transport(), DocumentDBConfig::SP(makeBaseConfigSnapshot(f1.transport()))) { saveBaseConfigSnapshot(f1.transport(), *f2, 80); DocumentDBConfig::SP esnap(makeEmptyConfigSnapshot()); diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 39a5c69376c..0144e260ffa 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -349,7 +349,7 @@ public: test::DiskMemUsageNotifier _diskMemUsageNotifier; BucketCreateNotifier _bucketCreateNotifier; MonitoredRefCount _refCount; - TransportMgr _transport; + Transport _transport; MaintenanceController _mc; MaintenanceControllerFixture(); diff --git a/searchcore/src/tests/proton/documentdb/storeonlyfeedview/CMakeLists.txt b/searchcore/src/tests/proton/documentdb/storeonlyfeedview/CMakeLists.txt index 3cdad86e9d7..6d94c066d20 100644 --- a/searchcore/src/tests/proton/documentdb/storeonlyfeedview/CMakeLists.txt +++ b/searchcore/src/tests/proton/documentdb/storeonlyfeedview/CMakeLists.txt @@ -3,6 +3,7 @@ vespa_add_executable(searchcore_storeonlyfeedview_test_app TEST SOURCES storeonlyfeedview_test.cpp DEPENDS + searchcore_test searchcore_server searchcore_feedoperation searchcore_documentmetastore diff --git a/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp b/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp index b1d7ee1d0a8..acd33ab749d 100644 --- a/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp +++ b/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp @@ -3,7 +3,6 @@ #include <vespa/document/base/documentid.h> #include <vespa/document/datatype/datatype.h> #include <vespa/searchcommon/common/schema.h> -#include <vespa/searchcore/proton/server/executorthreadingservice.h> #include <vespa/searchcore/proton/server/putdonecontext.h> #include <vespa/searchcore/proton/server/removedonecontext.h> #include <vespa/searchcore/proton/server/storeonlyfeedview.h> @@ -12,9 +11,11 @@ #include <vespa/searchcore/proton/feedoperation/pruneremoveddocumentsoperation.h> #include <vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.h> #include <vespa/searchcore/proton/test/mock_summary_adapter.h> +#include <vespa/searchcore/proton/test/transport_helper.h> #include <vespa/searchcore/proton/test/thread_utils.h> #include <vespa/searchlib/index/docbuilder.h> #include <vespa/vespalib/util/destructor_callbacks.h> +#include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/testkit/testapp.h> #include <vespa/log/log.h> @@ -211,8 +212,7 @@ struct FixtureBase { std::atomic<int> heartbeatCount; std::atomic<int> outstandingMoveOps; DocumentMetaStore::SP metaStore; - vespalib::ThreadStackExecutor sharedExecutor; - ExecutorThreadingService writeService; + TransportAndExecutorService _service; std::shared_ptr<PendingLidTrackerBase> pendingLidsForCommit; typename FeedViewType::UP feedview; SerialNum serial_num; @@ -226,8 +226,7 @@ struct FixtureBase { DocumentMetaStore::getFixedName(), search::GrowStrategy(), subDbType)), - sharedExecutor(1, 0x10000), - writeService(sharedExecutor), + _service(1), pendingLidsForCommit(std::make_shared<PendingLidTracker>()), feedview(), serial_num(2u) @@ -235,7 +234,7 @@ struct FixtureBase { StoreOnlyFeedView::PersistentParams params(0, 0, DocTypeName("foo"), subdb_id, subDbType); metaStore->constructFreeList(); ISummaryAdapter::SP adapter = std::make_shared<MySummaryAdapter>(removeCount, putCount, heartbeatCount); - feedview = std::make_unique<FeedViewType>(adapter, metaStore, writeService, + feedview = std::make_unique<FeedViewType>(adapter, metaStore, _service.write(), params, pendingLidsForCommit, outstandingMoveOps); } @@ -263,11 +262,11 @@ struct FixtureBase { template <typename FunctionType> void runInMasterAndSync(FunctionType func) { - test::runInMasterAndSync(writeService, func); + test::runInMasterAndSync(_service.write(), func); } template <typename FunctionType> void runInMaster(FunctionType func) { - test::runInMaster(writeService, func); + test::runInMaster(_service.write(), func); } void force_commit() { diff --git a/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp b/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp index 8a2e2084978..a30408a9301 100644 --- a/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp +++ b/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp @@ -3,11 +3,13 @@ #include <vespa/vespalib/testkit/testapp.h> #include <vespa/searchcore/proton/documentmetastore/i_store.h> #include <vespa/searchcore/proton/documentmetastore/lidreusedelayer.h> -#include <vespa/searchcore/proton/server/executorthreadingservice.h> #include <vespa/searchcore/proton/test/thread_utils.h> #include <vespa/searchcore/proton/test/threading_service_observer.h> +#include <vespa/searchcore/proton/test/transport_helper.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/destructor_callbacks.h> +#include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/util/gate.h> #include <vespa/log/log.h> LOG_SETUP("lidreusedelayer_test"); @@ -117,20 +119,17 @@ class Fixture { public: using LidReuseDelayer = documentmetastore::LidReuseDelayer; - vespalib::ThreadStackExecutor _sharedExecutor; - ExecutorThreadingService _writeServiceReal; + TransportAndExecutorService _service; test::ThreadingServiceObserver _writeService; MyMetaStore _store; std::unique_ptr<LidReuseDelayer> _lidReuseDelayer; Fixture() - : _sharedExecutor(1, 0x10000), - _writeServiceReal(_sharedExecutor), - _writeService(_writeServiceReal), + : _service(1), + _writeService(_service.write()), _store(), _lidReuseDelayer(std::make_unique<LidReuseDelayer>(_writeService, _store)) - { - } + { } ~Fixture() { commit(); diff --git a/searchcore/src/tests/proton/index/CMakeLists.txt b/searchcore/src/tests/proton/index/CMakeLists.txt index 62a631dc26e..313dd5e0457 100644 --- a/searchcore/src/tests/proton/index/CMakeLists.txt +++ b/searchcore/src/tests/proton/index/CMakeLists.txt @@ -3,6 +3,7 @@ vespa_add_executable(searchcore_indexmanager_test_app TEST SOURCES indexmanager_test.cpp DEPENDS + searchcore_test searchcore_server searchcore_index searchcore_flushengine @@ -17,6 +18,7 @@ vespa_add_executable(searchcore_fusionrunner_test_app TEST SOURCES fusionrunner_test.cpp DEPENDS + searchcore_test searchcore_server searchcore_index searchcore_pcommon diff --git a/searchcore/src/tests/proton/index/fusionrunner_test.cpp b/searchcore/src/tests/proton/index/fusionrunner_test.cpp index ae85211fe24..38314abd7e5 100644 --- a/searchcore/src/tests/proton/index/fusionrunner_test.cpp +++ b/searchcore/src/tests/proton/index/fusionrunner_test.cpp @@ -2,7 +2,7 @@ #include <vespa/fastos/file.h> #include <vespa/searchcore/proton/index/indexmanager.h> -#include <vespa/searchcore/proton/server/executorthreadingservice.h> +#include <vespa/searchcore/proton/test/transport_helper.h> #include <vespa/searchcorespi/index/fusionrunner.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/searchlib/common/flush_token.h> @@ -17,6 +17,7 @@ #include <vespa/vespalib/util/gate.h> #include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/testkit/testapp.h> +#include <vespa/vespalib/util/size_literals.h> #include <set> using document::Document; @@ -66,8 +67,7 @@ class Test : public vespalib::TestApp { FixedSourceSelector::UP _selector; FusionSpec _fusion_spec; DummyFileHeaderContext _fileHeaderContext; - vespalib::ThreadStackExecutor _sharedExecutor; - ExecutorThreadingService _threadingService; + TransportAndExecutorService _service; IndexManager::MaintainerOperations _ops; void setUp(); @@ -85,21 +85,21 @@ class Test : public vespalib::TestApp { void requireThatFusionCanBeStopped(); public: - Test() - : _fusion_runner(), - _selector(), - _fusion_spec(), - _fileHeaderContext(), - _sharedExecutor(1, 0x10000), - _threadingService(_sharedExecutor), - _ops(_fileHeaderContext, - TuneFileIndexManager(), 0, - _threadingService) - {} - ~Test() {} + Test(); + ~Test(); int Main() override; }; +Test::Test() + : _fusion_runner(), + _selector(), + _fusion_spec(), + _fileHeaderContext(), + _service(1), + _ops(_fileHeaderContext,TuneFileIndexManager(), 0, _service.write()) +{ } +Test::~Test() = default; + int Test::Main() { @@ -180,8 +180,8 @@ void Test::createIndex(const string &dir, uint32_t id, bool fusion) { Schema schema = getSchema(); DocBuilder doc_builder(schema); MemoryIndex memory_index(schema, MockFieldLengthInspector(), - _threadingService.indexFieldInverter(), - _threadingService.indexFieldWriter()); + _service.write().indexFieldInverter(), + _service.write().indexFieldWriter()); addDocument(doc_builder, memory_index, *_selector, id, id + 0, term); addDocument(doc_builder, memory_index, *_selector, id, id + 1, "bar"); addDocument(doc_builder, memory_index, *_selector, id, id + 2, "baz"); diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp index 1e33482b055..6cca28f5b43 100644 --- a/searchcore/src/tests/proton/index/indexmanager_test.cpp +++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp @@ -2,7 +2,7 @@ #include <vespa/fastos/file.h> #include <vespa/searchcore/proton/index/indexmanager.h> -#include <vespa/searchcore/proton/server/executorthreadingservice.h> +#include <vespa/searchcore/proton/test/transport_helper.h> #include <vespa/searchcorespi/index/index_manager_stats.h> #include <vespa/searchcorespi/index/indexcollection.h> #include <vespa/searchcorespi/index/indexflushtarget.h> @@ -24,7 +24,6 @@ #include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/util/gate.h> #include <vespa/vespalib/util/size_literals.h> -#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/util/time.h> #include <set> #include <thread> @@ -53,7 +52,6 @@ using search::memoryindex::FieldIndexCollection; using search::queryeval::Source; using std::set; using std::string; -using vespalib::ThreadStackExecutor; using vespalib::makeLambdaTask; using std::chrono::duration_cast; @@ -108,8 +106,7 @@ struct IndexManagerTest : public ::testing::Test { SerialNum _serial_num; IndexManagerDummyReconfigurer _reconfigurer; DummyFileHeaderContext _fileHeaderContext; - vespalib::ThreadStackExecutor _sharedExecutor; - ExecutorThreadingService _writeService; + TransportAndExecutorService _service; std::unique_ptr<IndexManager> _index_manager; Schema _schema; DocBuilder _builder; @@ -118,8 +115,7 @@ struct IndexManagerTest : public ::testing::Test { : _serial_num(0), _reconfigurer(), _fileHeaderContext(), - _sharedExecutor(1, 0x10000), - _writeService(_sharedExecutor), + _service(1), _index_manager(), _schema(getSchema()), _builder(_schema) @@ -130,13 +126,13 @@ struct IndexManagerTest : public ::testing::Test { } ~IndexManagerTest() { - _writeService.shutdown(); + _service.shutdown(); } template <class FunctionType> inline void runAsMaster(FunctionType &&function) { vespalib::Gate gate; - _writeService.master().execute(makeLambdaTask([&gate,function = std::move(function)]() { + _service.write().master().execute(makeLambdaTask([&gate,function = std::move(function)]() { function(); gate.countDown(); })); @@ -145,7 +141,7 @@ struct IndexManagerTest : public ::testing::Test { template <class FunctionType> inline void runAsIndex(FunctionType &&function) { vespalib::Gate gate; - _writeService.index().execute(makeLambdaTask([&gate,function = std::move(function)]() { + _service.write().index().execute(makeLambdaTask([&gate,function = std::move(function)]() { function(); gate.countDown(); })); @@ -210,7 +206,7 @@ IndexManagerTest::resetIndexManager() { _index_manager.reset(); _index_manager = std::make_unique<IndexManager>(index_dir, IndexConfig(), getSchema(), 1, - _reconfigurer, _writeService, _sharedExecutor, + _reconfigurer, _service.write(), _service.shared(), TuneFileIndexManager(), TuneFileAttributes(), _fileHeaderContext); } diff --git a/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp b/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp index b4193b0e0b2..34bc9ea13ed 100644 --- a/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp +++ b/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp @@ -64,8 +64,8 @@ struct DoctypeFixture { }; struct ConfigTestFixture { - const std::string configId; - TransportMgr transport; + const std::string configId; + Transport transport; ProtonConfigBuilder protonBuilder; DocumenttypesConfigBuilder documenttypesBuilder; FiledistributorrpcConfigBuilder filedistBuilder; diff --git a/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp b/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp index 08b9a55746b..05d993a33e0 100644 --- a/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp +++ b/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp @@ -16,7 +16,7 @@ using search::transactionlog::client::TransLogClient; using search::transactionlog::TransLogServer; using proton::DocTypeName; using proton::ProtonDiskLayout; -using proton::TransportMgr; +using proton::Transport; static constexpr unsigned int tlsPort = 9018; @@ -31,7 +31,7 @@ struct FixtureBase struct DiskLayoutFixture { DummyFileHeaderContext _fileHeaderContext; - TransportMgr _transport; + Transport _transport; TransLogServer _tls; vespalib::string _tlsSpec; ProtonDiskLayout _diskLayout; @@ -45,7 +45,7 @@ struct DiskLayoutFixture { } } void createDomains(const std::set<vespalib::string> &domains) { - TransLogClient tlc(_tlsSpec); + TransLogClient tlc(_transport.transport(), _tlsSpec); for (const auto &domain : domains) { ASSERT_TRUE(tlc.create(domain)); } @@ -53,7 +53,7 @@ struct DiskLayoutFixture { std::set<vespalib::string> listDomains() { std::vector<vespalib::string> domainVector; - TransLogClient tlc(_tlsSpec); + TransLogClient tlc(_transport.transport(), _tlsSpec); ASSERT_TRUE(tlc.listDomains(domainVector)); std::set<vespalib::string> domains; for (const auto &domain : domainVector) { @@ -97,7 +97,7 @@ DiskLayoutFixture::DiskLayoutFixture() _transport(), _tls(_transport.transport(), "tls", tlsPort, baseDir, _fileHeaderContext), _tlsSpec(vespalib::make_string("tcp/localhost:%u", tlsPort)), - _diskLayout(baseDir, _tlsSpec) + _diskLayout(_transport.transport(), baseDir, _tlsSpec) { } diff --git a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp index 9469c8b055f..0c06d27c916 100644 --- a/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp +++ b/searchcore/src/tests/proton/server/disk_mem_usage_sampler/disk_mem_usage_sampler_test.cpp @@ -38,7 +38,7 @@ public: }; struct DiskMemUsageSamplerTest : public ::testing::Test { - TransportMgr transport; + Transport transport; std::unique_ptr<DiskMemUsageSampler> sampler; DiskMemUsageSamplerTest() : transport(), diff --git a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp index 1b8c8d8491b..8f8200486d7 100644 --- a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp +++ b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp @@ -48,7 +48,7 @@ TEST(SharedThreadingServiceConfigTest, shared_threads_are_derived_from_cpu_cores class SharedThreadingServiceTest : public ::testing::Test { public: - TransportMgr transport; + Transport transport; std::unique_ptr<SharedThreadingService> service; SharedThreadingServiceTest() : transport(), diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 078e5a69fb3..cf6193555e5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -188,7 +188,8 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _baseDir(baseDir + "/" + _docTypeName.toString()), // Only one thread per executor, or performDropFeedView() will fail. _writeServiceConfig(configSnapshot->get_threading_service_config()), - _writeService(shared_service.shared(), shared_service.field_writer(), &shared_service.invokeService(), _writeServiceConfig, indexing_thread_stack_size), + _writeService(shared_service.shared(), shared_service.transport(), shared_service.field_writer(), + &shared_service.invokeService(), _writeServiceConfig, indexing_thread_stack_size), _initializeThreads(std::move(initializeThreads)), _initConfigSnapshot(), _initConfigSerialNum(0u), diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index ca6b3d9ba0f..a268a6eac87 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -40,17 +40,19 @@ VESPA_THREAD_STACK_TAG(field_writer_executor) } -ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor &sharedExecutor, uint32_t num_treads) - : ExecutorThreadingService(sharedExecutor, nullptr, nullptr, ThreadingServiceConfig::make(num_treads)) +ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor &sharedExecutor, FNET_Transport & transport, uint32_t num_treads) + : ExecutorThreadingService(sharedExecutor, transport, nullptr, nullptr, ThreadingServiceConfig::make(num_treads)) {} -ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor& sharedExecutor, - vespalib::ISequencedTaskExecutor* field_writer, +ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor & sharedExecutor, + FNET_Transport & transport, + vespalib::ISequencedTaskExecutor * field_writer, vespalib::InvokeService * invokerService, - const ThreadingServiceConfig& cfg, + const ThreadingServiceConfig & cfg, uint32_t stackSize) : _sharedExecutor(sharedExecutor), + _transport(transport), _masterExecutor(1, stackSize, CpuUsage::wrap(master_executor, CpuUsage::Category::WRITE)), _shared_field_writer(cfg.shared_field_writer()), _master_task_limit(cfg.master_task_limit()), diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h index 43d546927c2..77bb9042198 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h @@ -21,6 +21,7 @@ class ExecutorThreadingService : public searchcorespi::index::IThreadingService private: using Registration = std::unique_ptr<vespalib::IDestructorCallback>; vespalib::Executor & _sharedExecutor; + FNET_Transport & _transport; vespalib::ThreadStackExecutor _masterExecutor; ThreadingServiceConfig::SharedFieldWriterExecutor _shared_field_writer; std::atomic<uint32_t> _master_task_limit; @@ -42,9 +43,10 @@ public: /** * Convenience constructor used in unit tests. */ - ExecutorThreadingService(vespalib::Executor& sharedExecutor, uint32_t num_treads = 1); + ExecutorThreadingService(vespalib::Executor& sharedExecutor, FNET_Transport & transport, uint32_t num_treads = 1); ExecutorThreadingService(vespalib::Executor& sharedExecutor, + FNET_Transport & transport, vespalib::ISequencedTaskExecutor* field_writer, vespalib::InvokeService * invokeService, const ThreadingServiceConfig& cfg, @@ -79,6 +81,7 @@ public: vespalib::ISequencedTaskExecutor &indexFieldInverter() override; vespalib::ISequencedTaskExecutor &indexFieldWriter() override; vespalib::ISequencedTaskExecutor &attributeFieldWriter() override; + FNET_Transport &transport() override { return _transport; } ExecutorThreadingServiceStats getStats(); }; diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 16bd2537813..914753d567d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -408,7 +408,7 @@ FeedHandler::FeedHandler(IThreadingService &writeService, _owner(owner), _writeFilter(writeFilter), _replayConfig(replayConfig), - _tlsMgr(tlsSpec, docTypeName.getName()), + _tlsMgr(writeService.transport(), tlsSpec, docTypeName.getName()), _tlsWriterfactory(tlsWriterFactory), _tlsMgrWriter(), _tlsWriter(tlsWriter), diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 3aaf00a6541..0f20b0a7b47 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -314,7 +314,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) strategy = std::make_shared<SimpleFlush>(); break; } - _protonDiskLayout = std::make_unique<ProtonDiskLayout>(protonConfig.basedir, protonConfig.tlsspec); + _protonDiskLayout = std::make_unique<ProtonDiskLayout>(*_transport, protonConfig.basedir, protonConfig.tlsspec); vespalib::chdir(protonConfig.basedir); vespalib::alloc::MmapFileAllocatorFactory::instance().setup(protonConfig.basedir + "/swapdirs"); _tls->start(*_transport, hwInfo.cpu().cores()); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp index ed0e9ad79c5..b1a7bd87067 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton_config_fetcher.cpp @@ -6,7 +6,6 @@ #include "i_proton_configurer.h" #include <vespa/config/common/exceptions.h> #include <vespa/vespalib/util/exceptions.h> -#include <vespa/vespalib/util/size_literals.h> #include <thread> #include <cassert> diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp index 1c155abfd6b..0e6318158a2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp @@ -65,8 +65,9 @@ void scanDir(const vespalib::string documentsDir, DocumentDBDirScan &dirs) } -ProtonDiskLayout::ProtonDiskLayout(const vespalib::string &baseDir, const vespalib::string &tlsSpec) - : _baseDir(baseDir), +ProtonDiskLayout::ProtonDiskLayout(FNET_Transport & transport, const vespalib::string &baseDir, const vespalib::string &tlsSpec) + : _transport(transport), + _baseDir(baseDir), _tlsSpec(tlsSpec) { vespalib::mkdir(getDocumentsDir(_baseDir), true); @@ -83,7 +84,7 @@ ProtonDiskLayout::remove(const DocTypeName &docTypeName) vespalib::string removedDir(documentsDir + "/" + getRemovedName(name)); vespalib::rename(normalDir, removedDir, false, false); vespalib::File::sync(documentsDir); - TransLogClient tlc(_tlsSpec); + TransLogClient tlc(_transport, _tlsSpec); if (!tlc.remove(name)) { LOG(fatal, "Failed to remove tls domain %s", name.c_str()); LOG_ABORT("Failed to remove tls domain"); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.h b/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.h index befce5396a1..e779b388d7c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.h @@ -5,6 +5,8 @@ #include "i_proton_disk_layout.h" #include <vespa/vespalib/stllike/string.h> +class FNET_Transport; + namespace proton { /** @@ -14,11 +16,12 @@ namespace proton { class ProtonDiskLayout : public IProtonDiskLayout { private: - const vespalib::string _baseDir; - const vespalib::string _tlsSpec; + FNET_Transport & _transport; + const vespalib::string _baseDir; + const vespalib::string _tlsSpec; public: - ProtonDiskLayout(const vespalib::string &baseDir, const vespalib::string &tlsSpec); + ProtonDiskLayout(FNET_Transport & transport, const vespalib::string &baseDir, const vespalib::string &tlsSpec); ~ProtonDiskLayout() override; void remove(const DocTypeName &docTypeName) override; void initAndPruneUnused(const std::set<DocTypeName> &docTypeNames) override; diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp index 9b81904c695..58c8c5f92c2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp @@ -25,8 +25,8 @@ TransactionLogManager::doLogReplayComplete(const vespalib::string &domainName, } -TransactionLogManager::TransactionLogManager(const vespalib::string &tlsSpec, const vespalib::string &domainName) - : TransactionLogManagerBase(tlsSpec, domainName), +TransactionLogManager::TransactionLogManager(FNET_Transport & transport, const vespalib::string &tlsSpec, const vespalib::string &domainName) + : TransactionLogManagerBase(transport, tlsSpec, domainName), _visitor() { } diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h index 7634fbb2e8c..40e2b9faeb2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h +++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h @@ -24,7 +24,7 @@ public: * @param tlsSpec the spec of the transaction log server. * @param domainName the name of the domain this manager should handle. **/ - TransactionLogManager(const vespalib::string &tlsSpec, const vespalib::string &domainName); + TransactionLogManager(FNET_Transport & transport, const vespalib::string &tlsSpec, const vespalib::string &domainName); ~TransactionLogManager() override; /** diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp index 5fe54219868..55e82051de6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp @@ -12,9 +12,10 @@ using search::transactionlog::client::Visitor; namespace proton { -TransactionLogManagerBase::TransactionLogManagerBase( - const vespalib::string &tlsSpec, const vespalib::string &domainName) : - _tlc(std::make_unique<TransLogClient>(tlsSpec)), +TransactionLogManagerBase::TransactionLogManagerBase(FNET_Transport & transport, + const vespalib::string &tlsSpec, + const vespalib::string &domainName) : + _tlc(std::make_unique<TransLogClient>(transport, tlsSpec)), _tlcSession(), _domainName(domainName), _replayLock(), diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h index 5c795ecf5ec..16d0dec1409 100644 --- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h +++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h @@ -7,6 +7,8 @@ #include <mutex> #include <condition_variable> +class FNET_Transport; + namespace search::transactionlog::client { class TransLogClient; class Session; @@ -58,7 +60,8 @@ public: * @param tlsSpec the spec of the transaction log server. * @param domainName the name of the domain this manager should handle. **/ - TransactionLogManagerBase(const vespalib::string &tlsSpec, + TransactionLogManagerBase(FNET_Transport & transport, + const vespalib::string &tlsSpec, const vespalib::string &domainName); virtual ~TransactionLogManagerBase(); diff --git a/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt index e6eda87f66e..28f705716bc 100644 --- a/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt @@ -12,5 +12,6 @@ vespa_add_library(searchcore_test STATIC threading_service_observer.cpp transport_helper.cpp DEPENDS + searchcore_server searchcore_fconfig ) diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_document_sub_db.h b/searchcore/src/vespa/searchcore/proton/test/dummy_document_sub_db.h index 03ddcf3605b..6632fbc856a 100644 --- a/searchcore/src/vespa/searchcore/proton/test/dummy_document_sub_db.h +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_document_sub_db.h @@ -1,8 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "transport_helper.h" #include <vespa/searchcore/proton/server/idocumentsubdb.h> -#include <vespa/searchcore/proton/server/executorthreadingservice.h> #include <vespa/searchcore/proton/docsummary/isummarymanager.h> #include <vespa/searchcorespi/index/iindexmanager.h> #include <vespa/searchcore/proton/documentmetastore/documentmetastorecontext.h> @@ -21,15 +21,14 @@ namespace proton::test { struct DummyDocumentSubDb : public IDocumentSubDB { using IIndexManager = searchcorespi::IIndexManager; - uint32_t _subDbId; - DocumentMetaStoreContext _metaStoreCtx; - ISummaryManager::SP _summaryManager; - IIndexManager::SP _indexManager; - ISummaryAdapter::SP _summaryAdapter; - IIndexWriter::SP _indexWriter; - vespalib::ThreadStackExecutor _sharedExecutor; - std::unique_ptr<ExecutorThreadingService> _writeService; - PendingLidTracker _pendingLidTracker; + uint32_t _subDbId; + DocumentMetaStoreContext _metaStoreCtx; + ISummaryManager::SP _summaryManager; + IIndexManager::SP _indexManager; + ISummaryAdapter::SP _summaryAdapter; + IIndexWriter::SP _indexWriter; + mutable TransportAndExecutorService _service; + PendingLidTracker _pendingLidTracker; DummyDocumentSubDb(std::shared_ptr<bucketdb::BucketDBOwner> bucketDB, uint32_t subDbId) : _subDbId(subDbId), @@ -38,8 +37,8 @@ struct DummyDocumentSubDb : public IDocumentSubDB _indexManager(), _summaryAdapter(), _indexWriter(), - _sharedExecutor(1, 0x10000), - _writeService(std::make_unique<ExecutorThreadingService>(_sharedExecutor, 1)) + _service(1), + _pendingLidTracker() { } ~DummyDocumentSubDb() override { } @@ -49,7 +48,7 @@ struct DummyDocumentSubDb : public IDocumentSubDB DocumentSubDbInitializer::UP createInitializer(const DocumentDBConfig &, SerialNum,const index::IndexConfig &) const override { return std::make_unique<DocumentSubDbInitializer> - (const_cast<DummyDocumentSubDb &>(*this), _writeService->master()); + (const_cast<DummyDocumentSubDb &>(*this), _service.write().master()); } void setup(const DocumentSubDbInitializerResult &) override {} void initViews(const DocumentDBConfig &, const proton::matching::SessionManager::SP &) override {} diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp index e976bf1d726..d695f4d8dc7 100644 --- a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp @@ -8,7 +8,7 @@ MockSharedThreadingService::MockSharedThreadingService(ThreadExecutor& warmup_in : _warmup(warmup_in), _shared(shared_in), _invokeService(10ms), - _transportMgr() + _transport() { } diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h index d33afcbec3d..7db208fa3ef 100644 --- a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h +++ b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h @@ -13,7 +13,7 @@ private: ThreadExecutor & _warmup; ThreadExecutor & _shared; vespalib::InvokeServiceImpl _invokeService; - TransportMgr _transportMgr; + Transport _transport; public: MockSharedThreadingService(ThreadExecutor& warmup_in, @@ -23,7 +23,7 @@ public: ThreadExecutor& shared() override { return _shared; } vespalib::ISequencedTaskExecutor* field_writer() override { return nullptr; } vespalib::InvokeService & invokeService() override { return _invokeService; } - FNET_Transport & transport() override { return _transportMgr.transport(); } + FNET_Transport & transport() override { return _transport.transport(); } }; } diff --git a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h index 78a740ec2c3..f728ab5f025 100644 --- a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h +++ b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h @@ -49,6 +49,7 @@ public: vespalib::Executor &shared() override { return _shared; } + FNET_Transport & transport() override { return _service.transport(); } vespalib::ISequencedTaskExecutor &indexFieldInverter() override { return _indexFieldInverter; } diff --git a/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp b/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp index 10623fb0726..43e267805da 100644 --- a/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp +++ b/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp @@ -4,18 +4,52 @@ #include <vespa/fnet/transport.h> #include <vespa/fastos/thread.h> #include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/util/threadstackexecutor.h> +#include <vespa/searchcore/proton/server/executorthreadingservice.h> namespace proton { -TransportMgr::TransportMgr() +Transport::Transport() : _threadPool(std::make_unique<FastOS_ThreadPool>(64_Ki)), _transport(std::make_unique<FNET_Transport>()) { _transport->Start(_threadPool.get()); } -TransportMgr::~TransportMgr() { +Transport::~Transport() { + shutdown(); +} + +void +Transport::shutdown() { _transport->ShutDown(true); } +TransportAndExecutor::TransportAndExecutor(size_t num_threads) + : Transport(), + _sharedExecutor(std::make_unique<vespalib::ThreadStackExecutor>(num_threads, 64_Ki)) +{} + +TransportAndExecutor::~TransportAndExecutor() = default; + +void +TransportAndExecutor::shutdown() { + Transport::shutdown(); +} + +TransportAndExecutorService::TransportAndExecutorService(size_t num_threads) + : TransportAndExecutor(num_threads), + _writeService(std::make_unique<ExecutorThreadingService>(shared(), transport())) +{} +TransportAndExecutorService::~TransportAndExecutorService() = default; + +searchcorespi::index::IThreadingService & +TransportAndExecutorService::write() { + return *_writeService; +} + +void TransportAndExecutorService::shutdown() { + _writeService->shutdown(); + TransportAndExecutor::shutdown(); +} } diff --git a/searchcore/src/vespa/searchcore/proton/test/transport_helper.h b/searchcore/src/vespa/searchcore/proton/test/transport_helper.h index 09cca2dd007..81610193743 100644 --- a/searchcore/src/vespa/searchcore/proton/test/transport_helper.h +++ b/searchcore/src/vespa/searchcore/proton/test/transport_helper.h @@ -1,26 +1,47 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <memory> +#include <vespa/searchcorespi/index/ithreadingservice.h> class FastOS_ThreadPool; -class FNET_Transport; namespace proton { +class ExecutorThreadingService; + /** * Helper class contain a FNET_Transport object for use in tests. **/ -class TransportMgr { +class Transport { +public: + Transport(); + virtual ~Transport(); + FNET_Transport & transport() { return *_transport; } + FastOS_ThreadPool & threadPool() { return *_threadPool; } + virtual void shutdown(); private: std::unique_ptr<FastOS_ThreadPool> _threadPool; std::unique_ptr<FNET_Transport> _transport; +}; +class TransportAndExecutor : public Transport { public: - TransportMgr(); - ~TransportMgr(); - FNET_Transport & transport() { return *_transport; } - FastOS_ThreadPool & threadPool() { return *_threadPool; } + TransportAndExecutor(size_t num_threads); + ~TransportAndExecutor() override; + vespalib::Executor & shared() { return *_sharedExecutor; } + void shutdown() override; +private: + std::unique_ptr<vespalib::Executor> _sharedExecutor; +}; + +class TransportAndExecutorService : public TransportAndExecutor { +public: + TransportAndExecutorService(size_t num_threads); + ~TransportAndExecutorService() override; + searchcorespi::index::IThreadingService & write(); + void shutdown() override; +private: + std::unique_ptr<ExecutorThreadingService> _writeService; }; } diff --git a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h index 59fe73e0dec..60997d6666b 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h +++ b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h @@ -3,6 +3,8 @@ #include "i_thread_service.h" +class FNET_Transport; + namespace vespalib { class ISequencedTaskExecutor; } namespace searchcorespi::index { @@ -73,6 +75,7 @@ struct IThreadingService virtual IThreadService &index() = 0; virtual vespalib::ThreadExecutor &summary() = 0; virtual vespalib::Executor &shared() = 0; + virtual FNET_Transport &transport() = 0; virtual vespalib::ISequencedTaskExecutor &indexFieldInverter() = 0; virtual vespalib::ISequencedTaskExecutor &indexFieldWriter() = 0; virtual vespalib::ISequencedTaskExecutor &attributeFieldWriter() = 0; diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index afb1eb53417..bf3a269107c 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -482,7 +482,7 @@ createAndFillDomain(const vespalib::string & dir, const vespalib::string & name, DummyFileHeaderContext fileHeaderContext; TLS tlss(dir, 18377, ".", fileHeaderContext, createDomainConfig(0x1000000).setEncoding(encoding), 4); - TransLogClient tls("tcp/localhost:18377"); + TransLogClient tls(tlss.transport, "tcp/localhost:18377"); createDomainTest(tls, name, preExistingDomains); auto s1 = openDomainTest(tls, name); @@ -493,7 +493,7 @@ void verifyDomain(const vespalib::string & dir, const vespalib::string & name) { DummyFileHeaderContext fileHeaderContext; TLS tlss(dir, 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); - TransLogClient tls("tcp/localhost:18377"); + TransLogClient tls(tlss.transport, "tcp/localhost:18377"); auto s1 = openDomainTest(tls, name); visitDomainTest(tls, s1.get(), name); } @@ -504,7 +504,7 @@ void testVisitOverGeneratedDomain(const vespalib::string & testDir) { DummyFileHeaderContext fileHeaderContext; TLS tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x10000)); - TransLogClient tls("tcp/localhost:18377"); + TransLogClient tls(tlss.transport, "tcp/localhost:18377"); vespalib::string name("test1"); createDomainTest(tls, name); @@ -522,7 +522,7 @@ testVisitOverPreExistingDomain(const vespalib::string & testDir) { // Depends on Test::testVisitOverGeneratedDomain() DummyFileHeaderContext fileHeaderContext; TLS tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x10000)); - TransLogClient tls("tcp/localhost:18377"); + TransLogClient tls(tlss.transport, "tcp/localhost:18377"); vespalib::string name("test1"); auto s1 = openDomainTest(tls, name); @@ -533,7 +533,7 @@ void partialUpdateTest(const vespalib::string & testDir) { DummyFileHeaderContext fileHeaderContext; TLS tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x10000)); - TransLogClient tls("tcp/localhost:18377"); + TransLogClient tls(tlss.transport, "tcp/localhost:18377"); auto s1 = openDomainTest(tls, "test1"); Session & session = *s1; @@ -612,7 +612,7 @@ TEST("testRemove") { test::DirectoryHandler testDir("testremove"); DummyFileHeaderContext fileHeaderContext; TLS tlss(testDir.getDir(), 18377, ".", fileHeaderContext, createDomainConfig(0x10000)); - TransLogClient tls("tcp/localhost:18377"); + TransLogClient tls(tlss.transport, "tcp/localhost:18377"); vespalib::string name("test-delete"); createDomainTest(tls, name); @@ -667,7 +667,7 @@ testSendingAlotOfDataSync(const vespalib::string & testDir) { { DummyFileHeaderContext fileHeaderContext; TLS tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x80000)); - TransLogClient tls("tcp/localhost:18377"); + TransLogClient tls(tlss.transport, "tcp/localhost:18377"); createDomainTest(tls, MANY, 0); auto s1 = openDomainTest(tls, MANY); @@ -690,7 +690,7 @@ testSendingAlotOfDataSync(const vespalib::string & testDir) { { DummyFileHeaderContext fileHeaderContext; TLS tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); - TransLogClient tls("tcp/localhost:18377"); + TransLogClient tls(tlss.transport, "tcp/localhost:18377"); auto s1 = openDomainTest(tls, "many"); SerialNum b(0), e(0); @@ -711,7 +711,7 @@ testSendingAlotOfDataSync(const vespalib::string & testDir) { { DummyFileHeaderContext fileHeaderContext; TLS tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); - TransLogClient tls("tcp/localhost:18377"); + TransLogClient tls(tlss.transport, "tcp/localhost:18377"); auto s1 = openDomainTest(tls, MANY); SerialNum b(0), e(0); @@ -739,7 +739,7 @@ void testSendingAlotOfDataAsync(const vespalib::string & testDir) { { DummyFileHeaderContext fileHeaderContext; TLS tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x80000)); - TransLogClient tls("tcp/localhost:18377"); + TransLogClient tls(tlss.transport, "tcp/localhost:18377"); createDomainTest(tls, MANY, 1); auto s1 = openDomainTest(tls, MANY); fillDomainTest(tlss.tls, MANY, NUM_PACKETS, NUM_ENTRIES); @@ -761,7 +761,7 @@ void testSendingAlotOfDataAsync(const vespalib::string & testDir) { { DummyFileHeaderContext fileHeaderContext; TLS tlss(testDir, 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); - TransLogClient tls("tcp/localhost:18377"); + TransLogClient tls(tlss.transport, "tcp/localhost:18377"); auto s1 = openDomainTest(tls, MANY); SerialNum b(0), e(0); @@ -796,7 +796,7 @@ TEST("testErase") { { DummyFileHeaderContext fileHeaderContext; TLS tlss(testDir.getDir(), 18377, ".", fileHeaderContext, createDomainConfig(0x80000)); - TransLogClient tls("tcp/localhost:18377"); + TransLogClient tls(tlss.transport, "tcp/localhost:18377"); createDomainTest(tls, "erase", 0); auto s1 = openDomainTest(tls, "erase"); @@ -805,7 +805,7 @@ TEST("testErase") { { DummyFileHeaderContext fileHeaderContext; TLS tlss(testDir.getDir(), 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); - TransLogClient tls("tcp/localhost:18377"); + TransLogClient tls(tlss.transport, "tcp/localhost:18377"); auto s1 = openDomainTest(tls, "erase"); @@ -893,7 +893,7 @@ TEST("testSync") { DummyFileHeaderContext fileHeaderContext; test::DirectoryHandler testDir("test9"); TLS tlss(testDir.getDir(), 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); - TransLogClient tls("tcp/localhost:18377"); + TransLogClient tls(tlss.transport, "tcp/localhost:18377"); createDomainTest(tls, "sync", 0); auto s1 = openDomainTest(tls, "sync"); @@ -916,7 +916,7 @@ TEST("test truncate on version mismatch") { test::DirectoryHandler testDir("test11"); { TLS tlss(testDir.getDir(), 18377, ".", fileHeaderContext, createDomainConfig(0x1000000)); - TransLogClient tls("tcp/localhost:18377"); + TransLogClient tls(tlss.transport, "tcp/localhost:18377"); createDomainTest(tls, "sync", 0); auto s1 = openDomainTest(tls, "sync"); @@ -937,7 +937,7 @@ TEST("test truncate on version mismatch") { EXPECT_TRUE(f.Close()); { TLS tlss(testDir.getDir(), 18377, ".", fileHeaderContext, createDomainConfig(0x10000)); - TransLogClient tls("tcp/localhost:18377"); + TransLogClient tls(tlss.transport, "tcp/localhost:18377"); auto s1 = openDomainTest(tls, "sync"); uint64_t from(0), to(0); size_t count(0); @@ -963,7 +963,7 @@ TEST("test truncation after short read") { DummyFileHeaderContext fileHeaderContext; { TLS tlss(topdir.getDir(), 18377, ".", fileHeaderContext, domainConfig); - TransLogClient tls(tlsspec); + TransLogClient tls(tlss.transport, tlsspec); createDomainTest(tls, domain, 0); auto s1 = openDomainTest(tls, domain); @@ -977,7 +977,7 @@ TEST("test truncation after short read") { EXPECT_EQUAL(2u, countFiles(dir)); { TLS tlss(topdir.getDir(), 18377, ".", fileHeaderContext, domainConfig); - TransLogClient tls(tlsspec); + TransLogClient tls(tlss.transport, tlsspec); auto s1 = openDomainTest(tls, domain); checkFilledDomainTest(*s1, TOTAL_NUM_ENTRIES); } @@ -990,7 +990,7 @@ TEST("test truncation after short read") { } { TLS tlss(topdir.getDir(), 18377, ".", fileHeaderContext, domainConfig); - TransLogClient tls(tlsspec); + TransLogClient tls(tlss.transport, tlsspec); auto s1 = openDomainTest(tls, domain); checkFilledDomainTest(*s1, TOTAL_NUM_ENTRIES - 1); } diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp index 3d8379adba2..de0e1b60ca0 100644 --- a/searchlib/src/tests/transactionlogstress/translogstress.cpp +++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp @@ -207,16 +207,16 @@ private: bool addEntry(const Packet::Entry & e); public: - FeederThread(const std::string & tlsSpec, const std::string & domain, + FeederThread(FNET_Transport & transport, const std::string & tlsSpec, const std::string & domain, const EntryGenerator & generator, uint32_t feedRate, size_t packetSize); ~FeederThread() override; void doRun() override; SerialNumRange getRange() const { return SerialNumRange(1, _lastCommited); } }; -FeederThread::FeederThread(const std::string & tlsSpec, const std::string & domain, +FeederThread::FeederThread(FNET_Transport & transport, const std::string & tlsSpec, const std::string & domain, const EntryGenerator & generator, uint32_t feedRate, size_t packetSize) - : _tlsSpec(tlsSpec), _domain(domain), _client(tlsSpec), _session(), + : _tlsSpec(tlsSpec), _domain(domain), _client(transport, tlsSpec), _session(), _generator(generator), _feedRate(feedRate), _packet(packetSize), _current(1), _lastCommited(1), _timer() {} FeederThread::~FeederThread() = default; @@ -301,10 +301,10 @@ protected: bool _validate; public: - Agent(const std::string & tlsSpec, const std::string & domain, + Agent(FNET_Transport & transport, const std::string & tlsSpec, const std::string & domain, const EntryGenerator & generator, const std::string & name, uint32_t id, bool validate) : client::Callback(), - _tlsSpec(tlsSpec), _domain(domain), _client(tlsSpec), + _tlsSpec(tlsSpec), _domain(domain), _client(transport, tlsSpec), _generator(generator), _name(name), _id(id), _validate(validate) {} ~Agent() override {} @@ -339,9 +339,9 @@ private: SerialNum getNext(); public: - VisitorAgent(const std::string & tlsSpec, const std::string & domain, + VisitorAgent(FNET_Transport & transport, const std::string & tlsSpec, const std::string & domain, const EntryGenerator & generator, uint32_t id, bool validate) : - Agent(tlsSpec, domain, generator, "VisitorAgent", id, validate), + Agent(transport, tlsSpec, domain, generator, "VisitorAgent", id, validate), _visitor(), _from(0), _to(0), _next(0), _state(IDLE) {} ~VisitorAgent() override = default; void start(SerialNum from, SerialNum to); @@ -470,24 +470,23 @@ private: void makeRandomVisitorVector(); public: - ControllerThread(const std::string & tlsSpec, const std::string & domain, const EntryGenerator & generator, + ControllerThread(FNET_Transport & transport, const std::string & tlsSpec, const std::string & domain, const EntryGenerator & generator, uint32_t numVisitors, vespalib::duration visitorInterval, vespalib::duration pruneInterval); ~ControllerThread(); - uint32_t runningVisitors(); std::vector<std::shared_ptr<VisitorAgent> > & getVisitors() { return _visitors; } virtual void doRun() override; }; -ControllerThread::ControllerThread(const std::string & tlsSpec, const std::string & domain, +ControllerThread::ControllerThread(FNET_Transport & transport, const std::string & tlsSpec, const std::string & domain, const EntryGenerator & generator, uint32_t numVisitors, vespalib::duration visitorInterval, vespalib::duration pruneInterval) - : _tlsSpec(tlsSpec), _domain(domain), _client(tlsSpec.c_str()), _session(), + : _tlsSpec(tlsSpec), _domain(domain), _client(transport, tlsSpec.c_str()), _session(), _generator(generator), _visitors(), _rndVisitors(), _visitorInterval(visitorInterval), _pruneInterval(pruneInterval), _pruneTimer(), _begin(0), _end(0), _count(0) { for (uint32_t i = 0; i < numVisitors; ++i) { - _visitors.push_back(std::make_shared<VisitorAgent>(tlsSpec, domain, generator, i, true)); + _visitors.push_back(std::make_shared<VisitorAgent>(transport, tlsSpec, domain, generator, i, true)); } } ControllerThread::~ControllerThread() = default; @@ -704,7 +703,7 @@ TransLogStress::Main() FNET_Transport transport; DummyFileHeaderContext fileHeaderContext; TransLogServer tls(transport, "server", 17897, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(_cfg.domainPartSize)); - TransLogClient client(tlsSpec); + TransLogClient client(transport, tlsSpec); client.create(domain); BufferGenerator bufferGenerator(_cfg.minStrLen, _cfg.maxStrLen); @@ -720,12 +719,12 @@ TransLogStress::Main() // start feeder and controller - FeederThread feeder(tlsSpec, domain, generator, _cfg.feedRate, _cfg.packetSize); + FeederThread feeder(transport, tlsSpec, domain, generator, _cfg.feedRate, _cfg.packetSize); threadPool.NewThread(&feeder); std::this_thread::sleep_for(sleepTime); - ControllerThread controller(tlsSpec, domain, generator, _cfg.numVisitors, _cfg.visitorInterval, _cfg.pruneInterval); + ControllerThread controller(transport, tlsSpec, domain, generator, _cfg.numVisitors, _cfg.visitorInterval, _cfg.pruneInterval); threadPool.NewThread(&controller); // stop feeder and controller diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp index 88b3ddd5242..b4534486e85 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp @@ -46,25 +46,22 @@ struct RpcTask : public vespalib::Executor::Task { } -TransLogClient::TransLogClient(const vespalib::string & rpcTarget) : +TransLogClient::TransLogClient(FNET_Transport & transport, const vespalib::string & rpcTarget) : _executor(std::make_unique<vespalib::ThreadStackExecutor>(1, 128_Ki, translogclient_rpc_callback)), _rpcTarget(rpcTarget), _sessions(), - _threadPool(std::make_unique<FastOS_ThreadPool>(60_Ki)), - _transport(std::make_unique<FNET_Transport>()), - _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())), + _supervisor(std::make_unique<FRT_Supervisor>(&transport)), _target(nullptr) { reconnect(); exportRPC(*_supervisor); - _transport->Start(_threadPool.get()); } TransLogClient::~TransLogClient() { disconnect(); _executor->shutdown().sync(); - _transport->ShutDown(true); + _supervisor->GetTransport()->sync(); } bool diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h index b22398d76b3..c3dcecf93b3 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h @@ -23,7 +23,7 @@ class Visitor; class TransLogClient : private FRT_Invokable { public: - TransLogClient(const vespalib::string & rpctarget); + TransLogClient(FNET_Transport & transport, const vespalib::string & rpctarget); TransLogClient(const TransLogClient &) = delete; TransLogClient& operator=(const TransLogClient &) = delete; ~TransLogClient() override; @@ -60,8 +60,6 @@ private: SessionMap _sessions; //Brute force lock for subscriptions. For multithread safety. std::mutex _lock; - std::unique_ptr<FastOS_ThreadPool> _threadPool; - std::unique_ptr<FNET_Transport> _transport; std::unique_ptr<FRT_Supervisor> _supervisor; FRT_Target * _target; }; |