From 0c00a3e980a54f903ed8cdfbca432493f6d694b2 Mon Sep 17 00:00:00 2001 From: Geir Storli Date: Thu, 17 Mar 2022 09:25:04 +0000 Subject: Add BucketExecutor to ISharedThreadingService interface. --- searchcore/src/apps/tests/persistenceconformance_test.cpp | 7 ++----- searchcore/src/tests/proton/docsummary/docsummary.cpp | 5 +---- searchcore/src/tests/proton/documentdb/documentdb_test.cpp | 5 +---- .../shared_threading_service_test.cpp | 13 +++++++++---- searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp | 2 +- .../src/vespa/searchcore/proton/server/documentdb.cpp | 6 ++---- searchcore/src/vespa/searchcore/proton/server/documentdb.h | 2 -- .../searchcore/proton/server/i_shared_threading_service.h | 8 ++++++++ searchcore/src/vespa/searchcore/proton/server/proton.cpp | 6 +++--- .../searchcore/proton/server/shared_threading_service.cpp | 10 ++++++---- .../searchcore/proton/server/shared_threading_service.h | 6 +++++- .../proton/test/mock_shared_threading_service.cpp | 5 ++++- .../searchcore/proton/test/mock_shared_threading_service.h | 8 ++++++-- 13 files changed, 48 insertions(+), 35 deletions(-) (limited to 'searchcore/src') diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp index 4369666591f..9a7b49065d4 100644 --- a/searchcore/src/apps/tests/persistenceconformance_test.cpp +++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp @@ -16,7 +16,6 @@ #include #include #include -#include #include #include #include @@ -176,7 +175,6 @@ private: vespalib::ThreadStackExecutor _summaryExecutor; MockSharedThreadingService _shared_service; TransLogServer _tls; - storage::spi::dummy::DummyBucketExecutor _bucketExecutor; static std::shared_ptr make_proton_config() { ProtonConfigBuilder proton_config; @@ -210,7 +208,7 @@ public: mgr.nextGeneration(_shared_service.transport(), 0ms); return DocumentDB::create(_baseDir, mgr.getConfig(), _tlsSpec, _queryLimiter, docType, bucketSpace, *b->getProtonConfigSP(), const_cast(*this), - _shared_service, _bucketExecutor, _tls, _metricsWireService, + _shared_service, _tls, _metricsWireService, _fileHeaderContext, _config_stores.getConfigStore(docType.toString()), std::make_shared(16, 128_Ki), HwInfo()); } @@ -225,8 +223,7 @@ DocumentDBFactory::DocumentDBFactory(const vespalib::string &baseDir, int tlsLis _metricsWireService(), _summaryExecutor(8, 128_Ki), _shared_service(_summaryExecutor, _summaryExecutor), - _tls(_shared_service.transport(), "tls", tlsListenPort, baseDir, _fileHeaderContext), - _bucketExecutor(2) + _tls(_shared_service.transport(), "tls", tlsListenPort, baseDir, _fileHeaderContext) {} DocumentDBFactory::~DocumentDBFactory() = default; diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp index ef8dc17dc0e..6ae9e7e2d52 100644 --- a/searchcore/src/tests/proton/docsummary/docsummary.cpp +++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp @@ -10,7 +10,6 @@ #include #include #include -#include #include #include #include @@ -180,7 +179,6 @@ public: vespalib::ThreadStackExecutor _summaryExecutor; MockSharedThreadingService _shared_service; TransLogServer _tls; - storage::spi::dummy::DummyBucketExecutor _bucketExecutor; bool _mkdirOk; matching::QueryLimiter _queryLimiter; DummyWireService _dummy; @@ -200,7 +198,6 @@ public: _summaryExecutor(8, 128_Ki), _shared_service(_summaryExecutor, _summaryExecutor), _tls(_shared_service.transport(), "tmp", 9013, ".", _fileHeaderContext), - _bucketExecutor(2), _mkdirOk(FastOS_File::MakeDirectory("tmpdb")), _queryLimiter(), _dummy(), @@ -227,7 +224,7 @@ public: } _ddb = DocumentDB::create("tmpdb", _configMgr.getConfig(), "tcp/localhost:9013", _queryLimiter, DocTypeName(docTypeName), makeBucketSpace(), *b->getProtonConfigSP(), *this, - _shared_service, _bucketExecutor, _tls, _dummy, _fileHeaderContext, + _shared_service, _tls, _dummy, _fileHeaderContext, std::make_unique(), std::make_shared(16, 128_Ki), _hwInfo), _ddb->start(); diff --git a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp index 1f3be7511da..0edea2c0c5c 100644 --- a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include @@ -122,7 +121,6 @@ struct Fixture : public FixtureBase { vespalib::ThreadStackExecutor _summaryExecutor; MockSharedThreadingService _shared_service; HwInfo _hwInfo; - storage::spi::dummy::DummyBucketExecutor _bucketExecutor; DocumentDB::SP _db; DummyFileHeaderContext _fileHeaderContext; TransLogServer _tls; @@ -146,7 +144,6 @@ Fixture::Fixture(bool file_config) _summaryExecutor(8, 128_Ki), _shared_service(_summaryExecutor, _summaryExecutor), _hwInfo(), - _bucketExecutor(2), _db(), _fileHeaderContext(), _tls(_shared_service.transport(), "tmp", 9014, ".", _fileHeaderContext), @@ -167,7 +164,7 @@ Fixture::Fixture(bool file_config) mgr.nextGeneration(_shared_service.transport(), 0ms); _db = DocumentDB::create(".", mgr.getConfig(), "tcp/localhost:9014", _queryLimiter, DocTypeName("typea"), makeBucketSpace(), - *b->getProtonConfigSP(), _myDBOwner, _shared_service, _bucketExecutor, _tls, _dummy, + *b->getProtonConfigSP(), _myDBOwner, _shared_service, _tls, _dummy, _fileHeaderContext, make_config_store(), std::make_shared(16, 128_Ki), _hwInfo); _db->start(); diff --git a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp index 8f8200486d7..a15bd8c67c9 100644 --- a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp +++ b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp @@ -1,18 +1,20 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include #include #include #include #include +#include #include #include -#include +using ProtonConfig = vespa::config::search::core::ProtonConfig; +using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder; using namespace proton; +using storage::spi::dummy::DummyBucketExecutor; using vespalib::ISequencedTaskExecutor; using vespalib::SequencedTaskExecutor; -using ProtonConfig = vespa::config::search::core::ProtonConfig; -using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder; ProtonConfig make_proton_config(double concurrency) @@ -49,15 +51,18 @@ TEST(SharedThreadingServiceConfigTest, shared_threads_are_derived_from_cpu_cores class SharedThreadingServiceTest : public ::testing::Test { public: Transport transport; + storage::spi::dummy::DummyBucketExecutor bucket_executor; std::unique_ptr service; SharedThreadingServiceTest() : transport(), + bucket_executor(2), service() { } ~SharedThreadingServiceTest() = default; void setup(double concurrency, uint32_t cpu_cores) { service = std::make_unique( - SharedThreadingServiceConfig::make(make_proton_config(concurrency), HwInfo::Cpu(cpu_cores)), transport.transport()); + SharedThreadingServiceConfig::make(make_proton_config(concurrency), HwInfo::Cpu(cpu_cores)), + transport.transport(), bucket_executor); } SequencedTaskExecutor* field_writer() { return dynamic_cast(service->field_writer()); diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp index 86dc38159e8..b42af79d946 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp @@ -591,7 +591,7 @@ MyBmNode::create_document_db(const BmClusterParams& params) mgr.nextGeneration(_shared_service.transport(), 0ms); _document_db = DocumentDB::create(_base_dir, mgr.getConfig(), _tls_spec, _query_limiter, _doc_type_name, _bucket_space, *bootstrap_config->getProtonConfigSP(), _document_db_owner, - _shared_service, *_persistence_engine, _tls, + _shared_service, _tls, _metrics_wire_service, _file_header_context, _config_stores.getConfigStore(_doc_type_name.toString()), std::make_shared(16, 128_Ki), HwInfo()); diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index e4d75085027..60e9dc68573 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -145,7 +145,6 @@ DocumentDB::create(const vespalib::string &baseDir, const ProtonConfig &protonCfg, IDocumentDBOwner &owner, ISharedThreadingService& shared_service, - storage::spi::BucketExecutor &bucketExecutor, const search::transactionlog::WriterFactory &tlsWriterFactory, MetricsWireService &metricsWireService, const search::common::FileHeaderContext &fileHeaderContext, @@ -155,7 +154,7 @@ DocumentDB::create(const vespalib::string &baseDir, { return DocumentDB::SP( new DocumentDB(baseDir, std::move(currentSnapshot), tlsSpec, queryLimiter, docTypeName, bucketSpace, - protonCfg, owner, shared_service, bucketExecutor, tlsWriterFactory, + protonCfg, owner, shared_service, tlsWriterFactory, metricsWireService, fileHeaderContext, std::move(config_store), initializeThreads, hwInfo)); } DocumentDB::DocumentDB(const vespalib::string &baseDir, @@ -167,7 +166,6 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, const ProtonConfig &protonCfg, IDocumentDBOwner &owner, ISharedThreadingService& shared_service, - storage::spi::BucketExecutor & bucketExecutor, const search::transactionlog::WriterFactory &tlsWriterFactory, MetricsWireService &metricsWireService, const FileHeaderContext &fileHeaderContext, @@ -210,7 +208,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _feedView(), _refCount(), _owner(owner), - _bucketExecutor(bucketExecutor), + _bucketExecutor(shared_service.bucket_executor()), _state(), _dmUsageForwarder(_writeService.master()), _writeFilter(), diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h index e3d467fc3c1..9dd9cf6542b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h @@ -202,7 +202,6 @@ private: const ProtonConfig &protonCfg, IDocumentDBOwner &owner, ISharedThreadingService& shared_service, - storage::spi::BucketExecutor &bucketExecutor, const search::transactionlog::WriterFactory &tlsWriterFactory, MetricsWireService &metricsWireService, const search::common::FileHeaderContext &fileHeaderContext, @@ -232,7 +231,6 @@ public: const ProtonConfig &protonCfg, IDocumentDBOwner &owner, ISharedThreadingService& shared_service, - storage::spi::BucketExecutor & bucketExecutor, const search::transactionlog::WriterFactory &tlsWriterFactory, MetricsWireService &metricsWireService, const search::common::FileHeaderContext &fileHeaderContext, diff --git a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h index 12377fd25e0..dccea41373f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h @@ -3,6 +3,8 @@ class FNET_Transport; +namespace storage::spi { struct BucketExecutor; } + namespace vespalib { class ISequencedTaskExecutor; class ThreadExecutor; @@ -53,6 +55,12 @@ public: */ virtual FNET_Transport & transport() = 0; + + /** + * Returns the executor for running a BucketTask in the persistence layer above the SPI. + */ + virtual storage::spi::BucketExecutor& bucket_executor() = 0; + /** * Return a very cheap clock. */ diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 9bf6d19231e..0c281ed75c2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -276,8 +276,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) setBucketCheckSumType(protonConfig); setFS4Compression(protonConfig); - _shared_service = std::make_unique(SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu()), _transport); - _diskMemUsageSampler = std::make_unique(_shared_service->transport(), protonConfig.basedir, + _diskMemUsageSampler = std::make_unique(_transport, protonConfig.basedir, diskMemUsageSamplerConfig(protonConfig, hwInfo)); _tls = std::make_unique(_configUri.createWithNewId(protonConfig.tlsconfigid), _fileHeaderContext); @@ -324,6 +323,8 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _diskMemUsageSampler->notifier(), protonConfig.visit.defaultserializedsize, protonConfig.visit.ignoremaxbytes); + _shared_service = std::make_unique( + SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu()), _transport, *_persistenceEngine); vespalib::string fileConfigId; _compile_cache_executor_binding = vespalib::eval::CompileCache::bind(_shared_service->shared_raw()); @@ -612,7 +613,6 @@ Proton::addDocumentDB(const document::DocumentType &docType, config, *this, *_shared_service, - *_persistenceEngine, *_tls->getTransLogServer(), *_metricsEngine, _fileHeaderContext, diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp index e55282e31e8..c6f8ca923e0 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp @@ -17,16 +17,18 @@ namespace proton { using SharedFieldWriterExecutor = ThreadingServiceConfig::ProtonConfig::Feeding::SharedFieldWriterExecutor; -SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfig& cfg, FNET_Transport & transport) - : - _transport(transport), +SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfig& cfg, + FNET_Transport& transport, + storage::spi::BucketExecutor& bucket_executor) + : _transport(transport), _warmup(cfg.warmup_threads(), 128_Ki, CpuUsage::wrap(proton_warmup_executor, CpuUsage::Category::COMPACT)), _shared(std::make_shared(cfg.shared_threads(), 128_Ki, - cfg.shared_task_limit(), proton_shared_executor)), + cfg.shared_task_limit(), proton_shared_executor)), _field_writer(), _invokeService(std::max(vespalib::adjustTimeoutByDetectedHz(1ms), cfg.field_writer_config().reactionTime())), _invokeRegistrations(), + _bucket_executor(bucket_executor), _clock(_invokeService.nowRef()) { const auto& fw_cfg = cfg.field_writer_config(); diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h index 463823f10cb..04e30b0f9b3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h @@ -23,9 +23,12 @@ private: std::unique_ptr _field_writer; vespalib::InvokeServiceImpl _invokeService; std::vector _invokeRegistrations; + storage::spi::BucketExecutor& _bucket_executor; vespalib::Clock _clock; public: - SharedThreadingService(const SharedThreadingServiceConfig& cfg, FNET_Transport & transport); + SharedThreadingService(const SharedThreadingServiceConfig& cfg, + FNET_Transport& transport, + storage::spi::BucketExecutor& bucket_executor); ~SharedThreadingService() override; std::shared_ptr shared_raw() { return _shared; } @@ -36,6 +39,7 @@ public: vespalib::ISequencedTaskExecutor* field_writer() override { return _field_writer.get(); } vespalib::InvokeService & invokeService() override { return _invokeService; } FNET_Transport & transport() override { return _transport; } + storage::spi::BucketExecutor& bucket_executor() override { return _bucket_executor; } const vespalib::Clock & clock() const override { return _clock; } }; diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp index ca9b89b1e60..e5d42b34370 100644 --- a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.cpp @@ -4,11 +4,14 @@ namespace proton { -MockSharedThreadingService::MockSharedThreadingService(ThreadExecutor& warmup_in, ThreadExecutor& shared_in) +MockSharedThreadingService::MockSharedThreadingService(ThreadExecutor& warmup_in, + ThreadExecutor& shared_in, + size_t num_bucket_executors) : _warmup(warmup_in), _shared(shared_in), _invokeService(10ms), _transport(), + _bucket_executor(num_bucket_executors), _clock(_invokeService.nowRef()) { } diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h index 35c8ee46de5..167d15d70eb 100644 --- a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h +++ b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h @@ -2,9 +2,10 @@ #pragma once #include "transport_helper.h" +#include #include -#include #include +#include namespace proton { @@ -15,16 +16,19 @@ private: ThreadExecutor & _shared; vespalib::InvokeServiceImpl _invokeService; Transport _transport; + storage::spi::dummy::DummyBucketExecutor _bucket_executor; vespalib::Clock _clock; public: MockSharedThreadingService(ThreadExecutor& warmup_in, - ThreadExecutor& shared_in); + ThreadExecutor& shared_in, + size_t num_bucket_executors = 2); ~MockSharedThreadingService() override; ThreadExecutor& warmup() override { return _warmup; } ThreadExecutor& shared() override { return _shared; } vespalib::ISequencedTaskExecutor* field_writer() override { return nullptr; } vespalib::InvokeService & invokeService() override { return _invokeService; } FNET_Transport & transport() override { return _transport.transport(); } + storage::spi::BucketExecutor& bucket_executor() override { return _bucket_executor; } const vespalib::Clock & clock() const override { return _clock; } }; -- cgit v1.2.3