diff options
Diffstat (limited to 'searchcore/src')
15 files changed, 273 insertions, 87 deletions
diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp index 8238eb21831..be9d394a2b6 100644 --- a/searchcore/src/apps/tests/persistenceconformance_test.cpp +++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp @@ -3,15 +3,19 @@ #include <vespa/vespalib/testkit/testapp.h> #include <tests/proton/common/dummydbowner.h> +#include <vespa/config-attributes.h> +#include <vespa/config-bucketspaces.h> #include <vespa/config-imported-fields.h> +#include <vespa/config-indexschema.h> #include <vespa/config-rank-profiles.h> +#include <vespa/config-summary.h> #include <vespa/config-summarymap.h> #include <vespa/document/base/testdocman.h> +#include <vespa/document/repo/documenttyperepo.h> +#include <vespa/document/test/make_bucket_space.h> #include <vespa/fastos/file.h> #include <vespa/persistence/conformancetest/conformancetest.h> #include <vespa/persistence/dummyimpl/dummy_bucket_executor.h> -#include <vespa/document/repo/documenttyperepo.h> -#include <vespa/document/test/make_bucket_space.h> #include <vespa/searchcommon/common/schemaconfigurer.h> #include <vespa/searchcore/proton/common/alloc_config.h> #include <vespa/searchcore/proton/common/hw_info.h> @@ -28,13 +32,10 @@ #include <vespa/searchcore/proton/server/persistencehandlerproxy.h> #include <vespa/searchcore/proton/server/threading_service_config.h> #include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h> +#include <vespa/searchcore/proton/test/mock_shared_threading_service.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/searchlib/transactionlog/translogserver.h> #include <vespa/searchsummary/config/config-juniperrc.h> -#include <vespa/config-bucketspaces.h> -#include <vespa/config-attributes.h> -#include <vespa/config-indexschema.h> -#include <vespa/config-summary.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/size_literals.h> @@ -174,6 +175,7 @@ private: mutable DummyWireService _metricsWireService; mutable MemoryConfigStores _config_stores; vespalib::ThreadStackExecutor _summaryExecutor; + MockSharedThreadingService _shared_service; storage::spi::dummy::DummyBucketExecutor _bucketExecutor; public: @@ -202,7 +204,7 @@ public: mgr.nextGeneration(0ms); return DocumentDB::create(_baseDir, mgr.getConfig(), _tlsSpec, _queryLimiter, _clock, docType, bucketSpace, *b->getProtonConfigSP(), const_cast<DocumentDBFactory &>(*this), - _summaryExecutor, _summaryExecutor, _bucketExecutor, _tls, _metricsWireService, + _shared_service, _bucketExecutor, _tls, _metricsWireService, _fileHeaderContext, _config_stores.getConfigStore(docType.toString()), std::make_shared<vespalib::ThreadStackExecutor>(16, 128_Ki), HwInfo()); } @@ -218,6 +220,7 @@ DocumentDBFactory::DocumentDBFactory(const vespalib::string &baseDir, int tlsLis _clock(), _metricsWireService(), _summaryExecutor(8, 128_Ki), + _shared_service(_summaryExecutor, _summaryExecutor), _bucketExecutor(2) {} DocumentDBFactory::~DocumentDBFactory() = default; diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp index 5c3fe94a8d7..c5a01de6b3b 100644 --- a/searchcore/src/tests/proton/docsummary/docsummary.cpp +++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp @@ -1,45 +1,46 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <tests/proton/common/dummydbowner.h> +#include <vespa/config-bucketspaces.h> #include <vespa/config/helper/configgetter.hpp> +#include <vespa/document/repo/documenttyperepo.h> +#include <vespa/document/test/make_bucket_space.h> #include <vespa/eval/eval/simple_value.h> #include <vespa/eval/eval/tensor_spec.h> -#include <vespa/eval/eval/value.h> #include <vespa/eval/eval/test/value_compare.h> -#include <vespa/document/repo/documenttyperepo.h> -#include <vespa/document/test/make_bucket_space.h> +#include <vespa/eval/eval/value.h> +#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h> #include <vespa/searchcore/proton/attribute/attribute_writer.h> -#include <vespa/searchcore/proton/test/bucketfactory.h> #include <vespa/searchcore/proton/docsummary/docsumcontext.h> #include <vespa/searchcore/proton/docsummary/documentstoreadapter.h> #include <vespa/searchcore/proton/docsummary/summarymanager.h> #include <vespa/searchcore/proton/documentmetastore/documentmetastore.h> #include <vespa/searchcore/proton/feedoperation/putoperation.h> +#include <vespa/searchcore/proton/matching/querylimiter.h> #include <vespa/searchcore/proton/metrics/metricswireservice.h> #include <vespa/searchcore/proton/server/bootstrapconfig.h> #include <vespa/searchcore/proton/server/documentdb.h> -#include <vespa/searchcore/proton/server/feedhandler.h> #include <vespa/searchcore/proton/server/documentdbconfigmanager.h> +#include <vespa/searchcore/proton/server/feedhandler.h> #include <vespa/searchcore/proton/server/idocumentsubdb.h> #include <vespa/searchcore/proton/server/memoryconfigstore.h> #include <vespa/searchcore/proton/server/searchview.h> #include <vespa/searchcore/proton/server/summaryadapter.h> -#include <vespa/searchcore/proton/matching/querylimiter.h> -#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h> -#include <vespa/vespalib/util/destructor_callbacks.h> +#include <vespa/searchcore/proton/test/bucketfactory.h> +#include <vespa/searchcore/proton/test/mock_shared_threading_service.h> #include <vespa/searchlib/engine/docsumapi.h> #include <vespa/searchlib/index/docbuilder.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/searchlib/tensor/tensor_attribute.h> #include <vespa/searchlib/transactionlog/nosyncproxy.h> #include <vespa/searchlib/transactionlog/translogserver.h> -#include <vespa/vespalib/data/slime/slime.h> -#include <vespa/vespalib/data/slime/json_format.h> #include <vespa/vespalib/data/simple_buffer.h> +#include <vespa/vespalib/data/slime/json_format.h> +#include <vespa/vespalib/data/slime/slime.h> #include <vespa/vespalib/encoding/base64.h> -#include <vespa/vespalib/util/size_literals.h> -#include <vespa/config-bucketspaces.h> #include <vespa/vespalib/testkit/testapp.h> +#include <vespa/vespalib/util/destructor_callbacks.h> +#include <vespa/vespalib/util/size_literals.h> #include <regex> #include <vespa/log/log.h> @@ -176,6 +177,7 @@ public: DummyFileHeaderContext _fileHeaderContext; TransLogServer _tls; vespalib::ThreadStackExecutor _summaryExecutor; + MockSharedThreadingService _shared_service; storage::spi::dummy::DummyBucketExecutor _bucketExecutor; bool _mkdirOk; matching::QueryLimiter _queryLimiter; @@ -196,6 +198,7 @@ public: _fileHeaderContext(), _tls("tmp", 9013, ".", _fileHeaderContext), _summaryExecutor(8, 128_Ki), + _shared_service(_summaryExecutor, _summaryExecutor), _bucketExecutor(2), _mkdirOk(FastOS_File::MakeDirectory("tmpdb")), _queryLimiter(), @@ -224,7 +227,7 @@ public: } _ddb = DocumentDB::create("tmpdb", _configMgr.getConfig(), "tcp/localhost:9013", _queryLimiter, _clock, DocTypeName(docTypeName), makeBucketSpace(), *b->getProtonConfigSP(), *this, - _summaryExecutor, _summaryExecutor, _bucketExecutor, _tls, _dummy, _fileHeaderContext, + _shared_service, _bucketExecutor, _tls, _dummy, _fileHeaderContext, std::make_unique<MemoryConfigStore>(), std::make_shared<vespalib::ThreadStackExecutor>(16, 128_Ki), _hwInfo), _ddb->start(); diff --git a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp index a24eeb262ab..b31534c011c 100644 --- a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp @@ -1,10 +1,12 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <tests/proton/common/dummydbowner.h> +#include <vespa/config-bucketspaces.h> #include <vespa/document/datatype/documenttype.h> #include <vespa/document/repo/documenttyperepo.h> -#include <vespa/fastos/file.h> #include <vespa/document/test/make_bucket_space.h> +#include <vespa/fastos/file.h> +#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h> #include <vespa/searchcore/proton/attribute/flushableattribute.h> #include <vespa/searchcore/proton/common/statusreport.h> #include <vespa/searchcore/proton/docsummary/summaryflushtarget.h> @@ -22,17 +24,16 @@ #include <vespa/searchcore/proton/server/feedhandler.h> #include <vespa/searchcore/proton/server/fileconfigmanager.h> #include <vespa/searchcore/proton/server/memoryconfigstore.h> -#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h> +#include <vespa/searchcore/proton/test/mock_shared_threading_service.h> #include <vespa/searchcorespi/index/indexflushtarget.h> #include <vespa/searchlib/attribute/attribute_read_guard.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/searchlib/transactionlog/translogserver.h> #include <vespa/vespalib/data/slime/slime.h> -#include <vespa/vespalib/util/size_literals.h> -#include <vespa/config-bucketspaces.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/vespalib/util/size_literals.h> #include <iostream> using namespace cloud::config::filedistribution; @@ -118,6 +119,7 @@ struct Fixture : public FixtureBase { DummyWireService _dummy; MyDBOwner _myDBOwner; vespalib::ThreadStackExecutor _summaryExecutor; + MockSharedThreadingService _shared_service; HwInfo _hwInfo; storage::spi::dummy::DummyBucketExecutor _bucketExecutor; DocumentDB::SP _db; @@ -142,6 +144,7 @@ Fixture::Fixture(bool file_config) _dummy(), _myDBOwner(), _summaryExecutor(8, 128_Ki), + _shared_service(_summaryExecutor, _summaryExecutor), _hwInfo(), _bucketExecutor(2), _db(), @@ -165,7 +168,7 @@ Fixture::Fixture(bool file_config) mgr.nextGeneration(0ms); _db = DocumentDB::create(".", mgr.getConfig(), "tcp/localhost:9014", _queryLimiter, _clock, DocTypeName("typea"), makeBucketSpace(), - *b->getProtonConfigSP(), _myDBOwner, _summaryExecutor, _summaryExecutor, _bucketExecutor, _tls, _dummy, + *b->getProtonConfigSP(), _myDBOwner, _shared_service, _bucketExecutor, _tls, _dummy, _fileHeaderContext, make_config_store(), std::make_shared<vespalib::ThreadStackExecutor>(16, 128_Ki), _hwInfo); _db->start(); diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp index 0882153edd6..62d86ce895d 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp @@ -1,19 +1,20 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "bm_node.h" #include "bm_cluster.h" #include "bm_cluster_params.h" #include "bm_message_bus.h" +#include "bm_node.h" #include "bm_node_stats.h" #include "bm_storage_chain_builder.h" #include "bm_storage_link_context.h" -#include "storage_api_chain_bm_feed_handler.h" -#include "storage_api_message_bus_bm_feed_handler.h" -#include "storage_api_rpc_bm_feed_handler.h" #include "document_api_message_bus_bm_feed_handler.h" #include "i_bm_distribution.h" #include "i_bm_feed_handler.h" #include "spi_bm_feed_handler.h" +#include "storage_api_chain_bm_feed_handler.h" +#include "storage_api_message_bus_bm_feed_handler.h" +#include "storage_api_rpc_bm_feed_handler.h" +#include <tests/proton/common/dummydbowner.h> #include <vespa/config-attributes.h> #include <vespa/config-bucketspaces.h> #include <vespa/config-imported-fields.h> @@ -39,18 +40,19 @@ #include <vespa/searchcore/proton/common/alloc_config.h> #include <vespa/searchcore/proton/matching/querylimiter.h> #include <vespa/searchcore/proton/metrics/metricswireservice.h> -#include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h> #include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h> +#include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h> #include <vespa/searchcore/proton/persistenceengine/persistenceengine.h> #include <vespa/searchcore/proton/server/bootstrapconfig.h> -#include <vespa/searchcore/proton/server/documentdb.h> #include <vespa/searchcore/proton/server/document_db_maintenance_config.h> #include <vespa/searchcore/proton/server/document_meta_store_read_guards.h> +#include <vespa/searchcore/proton/server/documentdb.h> #include <vespa/searchcore/proton/server/documentdbconfigmanager.h> #include <vespa/searchcore/proton/server/fileconfigmanager.h> #include <vespa/searchcore/proton/server/memoryconfigstore.h> #include <vespa/searchcore/proton/server/persistencehandlerproxy.h> #include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h> +#include <vespa/searchcore/proton/test/mock_shared_threading_service.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/searchlib/transactionlog/translogserver.h> #include <vespa/searchsummary/config/config-juniperrc.h> @@ -75,7 +77,6 @@ #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/size_literals.h> -#include <tests/proton/common/dummydbowner.h> #include <vespa/log/log.h> LOG_SETUP(".bmcluster.bm_node"); @@ -459,6 +460,7 @@ class MyBmNode : public BmNode proton::DummyWireService _metrics_wire_service; proton::MemoryConfigStores _config_stores; vespalib::ThreadStackExecutor _summary_executor; + proton::MockSharedThreadingService _shared_service; proton::DummyDBOwner _document_db_owner; BucketSpace _bucket_space; std::shared_ptr<DocumentDB> _document_db; @@ -523,6 +525,7 @@ MyBmNode::MyBmNode(const vespalib::string& base_dir, int base_port, uint32_t nod _metrics_wire_service(), _config_stores(), _summary_executor(8, 128_Ki), + _shared_service(_summary_executor, _summary_executor), _document_db_owner(), _bucket_space(document::test::makeBucketSpace(_doc_type_name.getName())), _document_db(), @@ -594,7 +597,7 @@ MyBmNode::create_document_db(const BmClusterParams& params) mgr.nextGeneration(0ms); _document_db = DocumentDB::create(_base_dir, mgr.getConfig(), _tls_spec, _query_limiter, _clock, _doc_type_name, _bucket_space, *bootstrap_config->getProtonConfigSP(), _document_db_owner, - _summary_executor, _summary_executor, *_persistence_engine, _tls, + _shared_service, *_persistence_engine, _tls, _metrics_wire_service, _file_header_context, _config_stores.getConfigStore(_doc_type_name.toString()), std::make_shared<vespalib::ThreadStackExecutor>(16, 128_Ki), HwInfo()); diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt index 511adbe66e9..efa22be6533 100644 --- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt @@ -97,6 +97,8 @@ vespa_add_library(searchcore_server STATIC searchhandlerproxy.cpp searchview.cpp simpleflush.cpp + shared_threading_service.cpp + shared_threading_service_config.cpp storeonlydocsubdb.cpp storeonlyfeedview.cpp summaryadapter.cpp diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index e5bf5013528..d491f4ab364 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -7,6 +7,7 @@ #include "documentdb.h" #include "documentdbconfigscout.h" #include "feedhandler.h" +#include "i_shared_threading_service.h" #include "idocumentdbowner.h" #include "idocumentsubdb.h" #include "maintenance_jobs_injector.h" @@ -131,8 +132,7 @@ DocumentDB::create(const vespalib::string &baseDir, document::BucketSpace bucketSpace, const ProtonConfig &protonCfg, IDocumentDBOwner &owner, - vespalib::ThreadExecutor &warmupExecutor, - vespalib::ThreadExecutor &sharedExecutor, + ISharedThreadingService& shared_service, storage::spi::BucketExecutor &bucketExecutor, const search::transactionlog::WriterFactory &tlsWriterFactory, MetricsWireService &metricsWireService, @@ -143,7 +143,7 @@ DocumentDB::create(const vespalib::string &baseDir, { return DocumentDB::SP( new DocumentDB(baseDir, std::move(currentSnapshot), tlsSpec, queryLimiter, clock, docTypeName, bucketSpace, - protonCfg, owner, warmupExecutor, sharedExecutor, bucketExecutor, tlsWriterFactory, + protonCfg, owner, shared_service, bucketExecutor, tlsWriterFactory, metricsWireService, fileHeaderContext, std::move(config_store), initializeThreads, hwInfo)); } DocumentDB::DocumentDB(const vespalib::string &baseDir, @@ -155,8 +155,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, document::BucketSpace bucketSpace, const ProtonConfig &protonCfg, IDocumentDBOwner &owner, - vespalib::Executor &warmupExecutor, - vespalib::ThreadExecutor &sharedExecutor, + ISharedThreadingService& shared_service, storage::spi::BucketExecutor & bucketExecutor, const search::transactionlog::WriterFactory &tlsWriterFactory, MetricsWireService &metricsWireService, @@ -176,7 +175,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _baseDir(baseDir + "/" + _docTypeName.toString()), // Only one thread per executor, or performDropFeedView() will fail. _writeServiceConfig(configSnapshot->get_threading_service_config()), - _writeService(sharedExecutor, _writeServiceConfig, indexing_thread_stack_size), + _writeService(shared_service.shared(), _writeServiceConfig, indexing_thread_stack_size), _initializeThreads(std::move(initializeThreads)), _initConfigSnapshot(), _initConfigSerialNum(0u), @@ -204,9 +203,9 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _writeFilter(), _transient_usage_provider(std::make_shared<DocumentDBResourceUsageProvider>(*this)), _feedHandler(std::make_unique<FeedHandler>(_writeService, tlsSpec, docTypeName, *this, _writeFilter, *this, tlsWriterFactory)), - _subDBs(*this, *this, *_feedHandler, _docTypeName, _writeService, warmupExecutor, fileHeaderContext, + _subDBs(*this, *this, *_feedHandler, _docTypeName, _writeService, shared_service.warmup(), fileHeaderContext, metricsWireService, getMetrics(), queryLimiter, clock, _configMutex, _baseDir, hwInfo), - _maintenanceController(_writeService.master(), sharedExecutor, _refCount, _docTypeName), + _maintenanceController(_writeService.master(), shared_service.shared(), _refCount, _docTypeName), _jobTrackers(), _calc(), _metricsUpdater(_subDBs, _writeService, _jobTrackers, *_sessionManager, _writeFilter) diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h index ee414db28bf..e829f477e8a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h @@ -47,12 +47,13 @@ namespace storage::spi { struct BucketExecutor; } namespace proton { class AttributeConfigInspector; +class ExecutorThreadingServiceStats; class IDocumentDBOwner; +class ISharedThreadingService; class ITransientResourceUsageProvider; -struct MetricsWireService; class StatusReport; -class ExecutorThreadingServiceStats; class TransientResourceUsageProvider; +struct MetricsWireService; namespace matching { class SessionManager; } @@ -200,8 +201,7 @@ private: document::BucketSpace bucketSpace, const ProtonConfig &protonCfg, IDocumentDBOwner &owner, - vespalib::Executor &warmupExecutor, - vespalib::ThreadExecutor &sharedExecutor, + ISharedThreadingService& shared_service, storage::spi::BucketExecutor &bucketExecutor, const search::transactionlog::WriterFactory &tlsWriterFactory, MetricsWireService &metricsWireService, @@ -232,8 +232,7 @@ public: document::BucketSpace bucketSpace, const ProtonConfig &protonCfg, IDocumentDBOwner &owner, - vespalib::ThreadExecutor &warmupExecutor, - vespalib::ThreadExecutor &sharedExecutor, + ISharedThreadingService& shared_service, storage::spi::BucketExecutor & bucketExecutor, const search::transactionlog::WriterFactory &tlsWriterFactory, MetricsWireService &metricsWireService, diff --git a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h new file mode 100644 index 00000000000..5145dbec43e --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h @@ -0,0 +1,33 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +namespace vespalib { class ThreadExecutor; } + +namespace proton { + +/** + * Interface containing the thread executors that are shared across all document dbs. + */ +class ISharedThreadingService { +public: + virtual ~ISharedThreadingService() {} + + /** + * Returns the executor used for warmup (e.g. index warmup). + */ + virtual vespalib::ThreadExecutor& warmup() = 0; + + /** + * Returns the shared executor used for various assisting tasks in a document db. + * + * Example usages include: + * - Disk index fusion. + * - Updating nearest neighbor index (in DenseTensorAttribute). + * - Loading nearest neighbor index (in DenseTensorAttribute). + * - Writing of data in the document store. + */ + virtual vespalib::ThreadExecutor& shared() = 0; +}; + +} + diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 5b21242e397..e056325e0d3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -111,15 +111,6 @@ diskMemUsageSamplerConfig(const ProtonConfig &proton, const HwInfo &hwInfo) hwInfo); } -size_t -derive_shared_threads(const ProtonConfig &proton, const HwInfo::Cpu &cpuInfo) { - size_t scaledCores = (size_t)std::ceil(cpuInfo.cores() * proton.feeding.concurrency); - - // We need at least 1 guaranteed free worker in order to ensure progress so #documentsdbs + 1 should suffice, - // but we will not be cheap and give it one extra. - return std::max(scaledCores, proton.documentdb.size() + proton.flush.maxconcurrent + 1); -} - uint32_t computeRpcTransportThreads(const ProtonConfig & cfg, const HwInfo::Cpu &cpuInfo) { bool areSearchAndDocsumAsync = cfg.docsum.async && cfg.search.async; @@ -144,8 +135,6 @@ struct MetricsUpdateHook : metrics::UpdateHook const vespalib::string CUSTOM_COMPONENT_API_PATH = "/state/v1/custom/component"; -VESPA_THREAD_STACK_TAG(proton_shared_executor) -VESPA_THREAD_STACK_TAG(index_warmup_executor) VESPA_THREAD_STACK_TAG(initialize_executor) VESPA_THREAD_STACK_TAG(close_executor) @@ -240,8 +229,7 @@ Proton::Proton(const config::ConfigUri & configUri, _protonDiskLayout(), _protonConfigurer(_executor, *this, _protonDiskLayout), _protonConfigFetcher(configUri, _protonConfigurer, subscribeTimeout), - _warmupExecutor(), - _sharedExecutor(), + _shared_service(), _compile_cache_executor_binding(), _queryLimiter(), _clock(0.001), @@ -333,11 +321,8 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) protonConfig.visit.ignoremaxbytes); vespalib::string fileConfigId; - _warmupExecutor = std::make_unique<vespalib::ThreadStackExecutor>(4, 128_Ki, index_warmup_executor); - - const size_t sharedThreads = derive_shared_threads(protonConfig, hwInfo.cpu()); - _sharedExecutor = std::make_shared<vespalib::BlockingThreadStackExecutor>(sharedThreads, 128_Ki, sharedThreads*16, proton_shared_executor); - _compile_cache_executor_binding = vespalib::eval::CompileCache::bind(_sharedExecutor); + _shared_service = std::make_unique<SharedThreadingService>(SharedThreadingServiceConfig::make(protonConfig, hwInfo.cpu())); + _compile_cache_executor_binding = vespalib::eval::CompileCache::bind(_shared_service->shared_raw()); InitializeThreads initializeThreads; if (protonConfig.initialize.threads > 0) { initializeThreads = std::make_shared<vespalib::ThreadStackExecutor>(protonConfig.initialize.threads, 128_Ki, initialize_executor); @@ -460,11 +445,9 @@ Proton::~Proton() if (_flushEngine) { _flushEngine->close(); } - if (_warmupExecutor) { - _warmupExecutor->sync(); - } - if (_sharedExecutor) { - _sharedExecutor->sync(); + if (_shared_service) { + _shared_service->warmup_raw().sync(); + _shared_service->shared_raw()->sync(); } if ( ! _documentDBMap.empty()) { @@ -483,9 +466,8 @@ Proton::~Proton() _documentDBMap.clear(); _persistenceEngine.reset(); _tls.reset(); - _warmupExecutor.reset(); _compile_cache_executor_binding.reset(); - _sharedExecutor.reset(); + _shared_service.reset(); _clock.stop(); LOG(debug, "Explicit destructor done"); } @@ -619,11 +601,23 @@ Proton::addDocumentDB(const document::DocumentType &docType, // 1 thread per document type. initializeThreads = std::make_shared<vespalib::ThreadStackExecutor>(1, 128_Ki); } - auto ret = DocumentDB::create(config.basedir + "/documents", documentDBConfig, config.tlsspec, - _queryLimiter, _clock, docTypeName, bucketSpace, config, *this, - *_warmupExecutor, *_sharedExecutor, *_persistenceEngine, *_tls->getTransLogServer(), - *_metricsEngine, _fileHeaderContext, std::move(config_store), - initializeThreads, bootstrapConfig->getHwInfo()); + auto ret = DocumentDB::create(config.basedir + "/documents", + documentDBConfig, + config.tlsspec, + _queryLimiter, + _clock, + docTypeName, + bucketSpace, + config, + *this, + *_shared_service, + *_persistenceEngine, + *_tls->getTransLogServer(), + *_metricsEngine, + _fileHeaderContext, + std::move(config_store), + initializeThreads, + bootstrapConfig->getHwInfo()); try { ret->start(); } catch (vespalib::Exception &e) { @@ -791,11 +785,9 @@ Proton::updateMetrics(const metrics::MetricLockGuard &) if (_summaryEngine) { updateExecutorMetrics(metrics.docsum, _summaryEngine->getExecutorStats()); } - if (_sharedExecutor) { - metrics.shared.update(_sharedExecutor->getStats()); - } - if (_warmupExecutor) { - metrics.warmup.update(_warmupExecutor->getStats()); + if (_shared_service) { + metrics.shared.update(_shared_service->shared().getStats()); + metrics.warmup.update(_shared_service->warmup().getStats()); } } } @@ -947,12 +939,12 @@ Proton::get_child(vespalib::stringref name) const return std::make_unique<ResourceUsageExplorer>(_diskMemUsageSampler->writeFilter(), _persistenceEngine->get_resource_usage_tracker()); } else if (name == THREAD_POOLS) { - return std::make_unique<ProtonThreadPoolsExplorer>(_sharedExecutor.get(), + return std::make_unique<ProtonThreadPoolsExplorer>((_shared_service) ? _shared_service->shared_raw().get() : nullptr, (_matchEngine) ? &_matchEngine->get_executor() : nullptr, (_summaryEngine) ? &_summaryEngine->get_executor() : nullptr, (_flushEngine) ? &_flushEngine->get_executor() : nullptr, &_executor, - _warmupExecutor.get()); + (_shared_service) ? &_shared_service->warmup() : nullptr); } return Explorer_UP(nullptr); } diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h index 6fd31352051..c57d3e26d8b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton.h @@ -12,6 +12,8 @@ #include "proton_config_fetcher.h" #include "proton_configurer.h" #include "rpc_hooks.h" +#include "shared_threading_service.h" +#include <vespa/eval/eval/llvm/compile_cache.h> #include <vespa/searchcore/proton/matching/querylimiter.h> #include <vespa/searchcore/proton/metrics/metrics_engine.h> #include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h> @@ -24,7 +26,6 @@ #include <vespa/vespalib/net/json_handler_repo.h> #include <vespa/vespalib/net/state_explorer.h> #include <vespa/vespalib/util/varholder.h> -#include <vespa/eval/eval/llvm/compile_cache.h> #include <mutex> #include <shared_mutex> @@ -101,8 +102,7 @@ private: std::unique_ptr<IProtonDiskLayout> _protonDiskLayout; ProtonConfigurer _protonConfigurer; ProtonConfigFetcher _protonConfigFetcher; - std::unique_ptr<vespalib::ThreadStackExecutorBase> _warmupExecutor; - std::shared_ptr<vespalib::ThreadStackExecutorBase> _sharedExecutor; + std::unique_ptr<SharedThreadingService> _shared_service; vespalib::eval::CompileCache::ExecutorBinding::UP _compile_cache_executor_binding; matching::QueryLimiter _queryLimiter; vespalib::Clock _clock; diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp new file mode 100644 index 00000000000..04e775674b4 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp @@ -0,0 +1,19 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "shared_threading_service.h" +#include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/util/blockingthreadstackexecutor.h> + +VESPA_THREAD_STACK_TAG(proton_shared_executor) +VESPA_THREAD_STACK_TAG(proton_warmup_executor) + +namespace proton { + +SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfig& cfg) + : _warmup(cfg.warmup_threads(), 128_Ki, proton_warmup_executor), + _shared(std::make_shared<vespalib::BlockingThreadStackExecutor>(cfg.shared_threads(), 128_Ki, + cfg.shared_task_limit(), proton_shared_executor)) +{ +} + +} diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h new file mode 100644 index 00000000000..ef0ff31c389 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h @@ -0,0 +1,30 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "i_shared_threading_service.h" +#include "shared_threading_service_config.h" +#include <vespa/vespalib/util/threadstackexecutor.h> +#include <vespa/vespalib/util/syncable.h> +#include <memory> + +namespace proton { + +/** + * Class containing the thread executors that are shared across all document dbs. + */ +class SharedThreadingService : public ISharedThreadingService { +private: + vespalib::ThreadStackExecutor _warmup; + std::shared_ptr<vespalib::SyncableThreadExecutor> _shared; + +public: + SharedThreadingService(const SharedThreadingServiceConfig& cfg); + + vespalib::SyncableThreadExecutor& warmup_raw() { return _warmup; } + std::shared_ptr<vespalib::SyncableThreadExecutor> shared_raw() { return _shared; } + + vespalib::ThreadExecutor& warmup() override { return _warmup; } + vespalib::ThreadExecutor& shared() override { return *_shared; } +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp new file mode 100644 index 00000000000..cf62cf3b76c --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp @@ -0,0 +1,41 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "shared_threading_service_config.h" +#include <vespa/searchcore/config/config-proton.h> +#include <cmath> + +namespace proton { + +using ProtonConfig = SharedThreadingServiceConfig::ProtonConfig; + +SharedThreadingServiceConfig::SharedThreadingServiceConfig(uint32_t shared_threads_in, + uint32_t shared_task_limit_in, + uint32_t warmup_threads_in) + : _shared_threads(shared_threads_in), + _shared_task_limit(shared_task_limit_in), + _warmup_threads(warmup_threads_in) +{ +} + +namespace { + +size_t +derive_shared_threads(const ProtonConfig& cfg, const HwInfo::Cpu& cpu_info) +{ + size_t scaled_cores = (size_t)std::ceil(cpu_info.cores() * cfg.feeding.concurrency); + + // We need at least 1 guaranteed free worker in order to ensure progress. + return std::max(scaled_cores, cfg.documentdb.size() + cfg.flush.maxconcurrent + 1); +} + +} + +SharedThreadingServiceConfig +SharedThreadingServiceConfig::make(const proton::SharedThreadingServiceConfig::ProtonConfig& cfg, + const proton::HwInfo::Cpu& cpu_info) +{ + size_t shared_threads = derive_shared_threads(cfg, cpu_info); + return proton::SharedThreadingServiceConfig(shared_threads, shared_threads * 16, 4); +} + +} diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h new file mode 100644 index 00000000000..02966e0efeb --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h @@ -0,0 +1,36 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/searchcore/proton/common/hw_info.h> + +namespace vespa::config::search::core::internal { class InternalProtonType; } + +namespace proton { + +/** + * Config for the thread executors that are shared across all document dbs. + */ +class SharedThreadingServiceConfig { +public: + using ProtonConfig = const vespa::config::search::core::internal::InternalProtonType; + +private: + uint32_t _shared_threads; + uint32_t _shared_task_limit; + uint32_t _warmup_threads; + +public: + SharedThreadingServiceConfig(uint32_t shared_threads_in, + uint32_t shared_task_limit_in, + uint32_t warmup_threads_in); + + static SharedThreadingServiceConfig make(const ProtonConfig& cfg, const HwInfo::Cpu& cpu_info); + + uint32_t shared_threads() const { return _shared_threads; } + uint32_t shared_task_limit() const { return _shared_task_limit; } + uint32_t warmup_threads() const { return _warmup_threads; } + +}; + +} + diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h new file mode 100644 index 00000000000..f21f43ed5ad --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h @@ -0,0 +1,23 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/searchcore/proton/server/i_shared_threading_service.h> + +namespace proton { + +class MockSharedThreadingService : public ISharedThreadingService { +private: + vespalib::ThreadExecutor& _warmup; + vespalib::ThreadExecutor& _shared; + +public: + MockSharedThreadingService(vespalib::ThreadExecutor& warmup_in, + vespalib::ThreadExecutor& shared_in) + : _warmup(warmup_in), + _shared(shared_in) + {} + vespalib::ThreadExecutor& warmup() override { return _warmup; } + vespalib::ThreadExecutor& shared() override { return _shared; } +}; + +} |