diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-01-29 09:05:07 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-01-29 09:05:07 +0100 |
commit | 438f9649bc72e90392b43bd9f42055d4cceffd42 (patch) | |
tree | 3bf5ff86142a977704b385d7f18089030b25a7db /searchcore | |
parent | bd0bd7b87dc94852d5a82b0e16254294d8904ddb (diff) | |
parent | 1adf3a39dad82d07c2d0db9010d75de3d5249608 (diff) |
Merge pull request #8263 from vespa-engine/balder/use-correct-concurrency
Balder/use correct concurrency.
Diffstat (limited to 'searchcore')
4 files changed, 42 insertions, 30 deletions
diff --git a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp index 658ebe818eb..340619f09bd 100644 --- a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp +++ b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp @@ -22,11 +22,10 @@ struct Fixture { builder.indexing.threads = baseLineIndexingThreads; builder.indexing.tasklimit = 500; builder.indexing.semiunboundtasklimit = 50000; - builder.feeding.concurrency = 0.5; return builder; } ThreadingServiceConfig make(uint32_t cpuCores) { - return ThreadingServiceConfig::make(cfg, HwInfo::Cpu(cpuCores)); + return ThreadingServiceConfig::make(cfg, 0.5, HwInfo::Cpu(cpuCores)); } void assertIndexingThreads(uint32_t expIndexingThreads, uint32_t cpuCores) { EXPECT_EQUAL(expIndexingThreads, make(cpuCores).indexingThreads()); diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 6a1d053745f..352de412f79 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -68,20 +68,20 @@ namespace proton { namespace { constexpr uint32_t indexing_thread_stack_size = 128 * 1024; +using Allocation = ProtonConfig::Documentdb::Allocation; GrowStrategy -makeGrowStrategy(uint32_t docsInitialCapacity, const ProtonConfig::Grow &growCfg) +makeGrowStrategy(uint32_t docsInitialCapacity, const Allocation &allocCfg) { - return GrowStrategy(docsInitialCapacity, growCfg.factor, growCfg.add, growCfg.multivalueallocfactor); + return GrowStrategy(docsInitialCapacity, allocCfg.growfactor, allocCfg.growbias, allocCfg.multivaluegrowfactor); } DocumentSubDBCollection::Config -makeSubDBConfig(const ProtonConfig & protonCfg) { - const ProtonConfig::Grow & growCfg = protonCfg.grow; - const ProtonConfig::Distribution & distCfg = protonCfg.distribution; - GrowStrategy searchableGrowth = makeGrowStrategy(growCfg.initial * distCfg.searchablecopies, growCfg); - GrowStrategy removedGrowth = makeGrowStrategy(std::max(1024l, growCfg.initial/100), growCfg); - GrowStrategy notReadyGrowth = makeGrowStrategy(growCfg.initial * (distCfg.redundancy - distCfg.searchablecopies), growCfg); - return DocumentSubDBCollection::Config(searchableGrowth, notReadyGrowth, removedGrowth, growCfg.numdocs, protonCfg.numsearcherthreads); +makeSubDBConfig(const ProtonConfig::Distribution & distCfg, const Allocation & allocCfg, size_t numSearcherThreads) { + size_t initialNumDocs(allocCfg.initialnumdocs); + GrowStrategy searchableGrowth = makeGrowStrategy(initialNumDocs * distCfg.searchablecopies, allocCfg); + GrowStrategy removedGrowth = makeGrowStrategy(std::max(1024ul, initialNumDocs/100), allocCfg); + GrowStrategy notReadyGrowth = makeGrowStrategy(initialNumDocs * (distCfg.redundancy - distCfg.searchablecopies), allocCfg); + return DocumentSubDBCollection::Config(searchableGrowth, notReadyGrowth, removedGrowth, allocCfg.amortizecount, numSearcherThreads); } index::IndexConfig @@ -89,6 +89,18 @@ makeIndexConfig(const ProtonConfig::Index & cfg) { return index::IndexConfig(WarmupConfig(cfg.warmup.time, cfg.warmup.unpack), cfg.maxflushed, cfg.cache.size); } +ProtonConfig::Documentdb _G_defaultProtonDocumentDBConfig; + +const ProtonConfig::Documentdb * +findDocumentDB(const ProtonConfig::DocumentdbVector & documentDBs, const vespalib::string & docType) { + for (const auto & dbCfg : documentDBs) { + if (dbCfg.inputdoctypename == docType) { + return & dbCfg; + } + } + return &_G_defaultProtonDocumentDBConfig; +} + } template <typename FunctionType> @@ -124,7 +136,10 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _bucketSpace(bucketSpace), _baseDir(baseDir + "/" + _docTypeName.toString()), // Only one thread per executor, or performDropFeedView() will fail. - _writeServiceConfig(ThreadingServiceConfig::make(protonCfg, hwInfo.cpu())), + _writeServiceConfig( + ThreadingServiceConfig::make(protonCfg, + findDocumentDB(protonCfg.documentdb, docTypeName.getName())->feeding.concurrency, + hwInfo.cpu())), _writeService(_writeServiceConfig.indexingThreads(), indexing_thread_stack_size, _writeServiceConfig.defaultTaskLimit()), @@ -142,7 +157,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _bucketHandler(_writeService.master()), _indexCfg(makeIndexConfig(protonCfg.index)), _config_store(std::move(config_store)), - _sessionManager(new matching::SessionManager(protonCfg.grouping.sessionmanager.maxentries)), + _sessionManager(std::make_shared<matching::SessionManager>(protonCfg.grouping.sessionmanager.maxentries)), _metricsWireService(metricsWireService), _metricsHook(*this, _docTypeName.getName(), protonCfg.numthreadspersearch), _feedView(), @@ -153,9 +168,12 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _dmUsageForwarder(_writeService.master()), _writeFilter(), _feedHandler(_writeService, tlsSpec, docTypeName, _state, *this, _writeFilter, *this, tlsDirectWriter), - _subDBs(*this, *this, _feedHandler, _docTypeName, _writeService, warmupExecutor, - sharedExecutor, fileHeaderContext, metricsWireService, getMetrics(), - queryLimiter, clock, _configMutex, _baseDir, makeSubDBConfig(protonCfg), hwInfo), + _subDBs(*this, *this, _feedHandler, _docTypeName, _writeService, warmupExecutor, sharedExecutor, fileHeaderContext, + metricsWireService, getMetrics(), queryLimiter, clock, _configMutex, _baseDir, + makeSubDBConfig(protonCfg.distribution, + findDocumentDB(protonCfg.documentdb, docTypeName.getName())->allocation, + protonCfg.numsearcherthreads), + hwInfo), _maintenanceController(_writeService.master(), sharedExecutor, _docTypeName), _visibility(_feedHandler, _writeService, _feedView), _lidSpaceCompactionHandlers(), @@ -165,8 +183,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, { assert(configSnapshot); - LOG(debug, "DocumentDB(%s): Creating database in directory '%s'", - _docTypeName.toString().c_str(), _baseDir.c_str()); + LOG(debug, "DocumentDB(%s): Creating database in directory '%s'", _docTypeName.toString().c_str(), _baseDir.c_str()); _feedHandler.init(_config_store->getOldestSerialNum()); _feedHandler.setBucketDBHandler(&_subDBs.getBucketDBHandler()); diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp index cd1608feadb..8f1c3560e9b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp @@ -20,21 +20,20 @@ ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_, namespace { uint32_t -calculateIndexingThreads(const ProtonConfig &cfg, const HwInfo::Cpu &cpuInfo) +calculateIndexingThreads(uint32_t cfgIndexingThreads, double concurrency, const HwInfo::Cpu &cpuInfo) { - double scaledCores = cpuInfo.cores() * cfg.feeding.concurrency; - uint32_t indexingThreads = std::max((uint32_t)std::ceil(scaledCores / 3), (uint32_t)cfg.indexing.threads); + double scaledCores = cpuInfo.cores() * concurrency; + uint32_t indexingThreads = std::max((uint32_t)std::ceil(scaledCores / 3), cfgIndexingThreads); return std::max(indexingThreads, 1u); } } ThreadingServiceConfig -ThreadingServiceConfig::make(const ProtonConfig &cfg, const HwInfo::Cpu &cpuInfo) +ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo) { - uint32_t indexingThreads = calculateIndexingThreads(cfg, cpuInfo); - return ThreadingServiceConfig(indexingThreads, - cfg.indexing.tasklimit, + uint32_t indexingThreads = calculateIndexingThreads(cfg.indexing.threads, concurrency, cpuInfo); + return ThreadingServiceConfig(indexingThreads, cfg.indexing.tasklimit, (cfg.indexing.semiunboundtasklimit / indexingThreads)); } diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h index 03c0c4d1842..be39f516598 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h @@ -20,13 +20,10 @@ private: uint32_t _semiUnboundTaskLimit; private: - ThreadingServiceConfig(uint32_t indexingThreads_, - uint32_t defaultTaskLimit_, - uint32_t semiUnboundTaskLimit_); + ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, uint32_t semiUnboundTaskLimit_); public: - static ThreadingServiceConfig make(const ProtonConfig &cfg, - const HwInfo::Cpu &cpuInfo); + static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo); uint32_t indexingThreads() const { return _indexingThreads; } uint32_t defaultTaskLimit() const { return _defaultTaskLimit; } |