diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-06 18:24:05 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-06 18:24:05 +0200 |
commit | 8c0427bd8b0de46d8c61f259f80aa3b81bfa128c (patch) | |
tree | 5b044748ebb96c39a0a0a9a90d01f5e7e57e9a04 /searchcore | |
parent | 4bcfe2688131797449e20b8400228f75a95bc000 (diff) | |
parent | a0c814fafbedba6edc8e730dfca1363623bf3437 (diff) |
Merge pull request #3675 from vespa-engine/balder/summary.numthreads-2-backgroundnumthreads-rebased-1
Balder/summary.numthreads 2 backgroundnumthreads rebased 1
Diffstat (limited to 'searchcore')
3 files changed, 42 insertions, 40 deletions
diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def index 594a8717d7f..8c01c45dac8 100644 --- a/searchcore/src/vespa/searchcore/config/proton.def +++ b/searchcore/src/vespa/searchcore/config/proton.def @@ -256,6 +256,8 @@ summary.log.maxbucketspread double default=2.5 summary.log.minfilesizefactor double default=0.2 ## Number of threads used for compressing incomming documents/compacting. +## Deprecated. Use background.threads instead. +## TODO Remove summary.log.numthreads int default=8 restart ## Control io options during flush of stored documents. @@ -376,6 +378,11 @@ visit.ignoremaxbytes bool default=true ## When set to 0 (default) we use 1 separate thread per document database. initialize.threads int default = 0 +## Number of worker threads doing background compaction/compression tasks. +## They all live i a shared thread pool. +## When set to 0 (default), it will have enough threads to saturate half of the cores. +background.threads int default=0 + ## Portion of enumstore address space that can be used before put and update ## portion of feed is blocked. writefilter.attribute.enumstorelimit double default = 0.9 diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp index f45fc658129..6aea5234fbe 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp @@ -170,9 +170,8 @@ deriveConfig(const ProtonConfig::Summary & summary, const ProtonConfig::Flush::M logConfig.setMaxFileSize(log.maxfilesize) .setMaxDiskBloatFactor(std::min(flush.diskbloatfactor, flush.each.diskbloatfactor)) .setMaxBucketSpread(log.maxbucketspread).setMinFileSizeFactor(log.minfilesizefactor) - .setNumThreads(log.numthreads).compact2ActiveFile(log.compact2activefile) - .compactCompression(deriveCompression(log.compact.compression)).setFileConfig(fileConfig) - .disableCrcOnRead(chunk.skipcrconread); + .compact2ActiveFile(log.compact2activefile).compactCompression(deriveCompression(log.compact.compression)) + .setFileConfig(fileConfig).disableCrcOnRead(chunk.skipcrconread); return LogDocumentStore::Config(config, logConfig); } diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 7caed5198bd..5863c03226c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -85,12 +85,24 @@ diskMemUsageSamplerConfig(const ProtonConfig &proton, const HwInfo &hwInfo) hwInfo); } +size_t +deriveBackgroundThreads(const ProtonConfig & proton) { + size_t threads = std::thread::hardware_concurrency(); + if (proton.background.threads != 0) { + threads = proton.background.threads; + } + threads = std::max(threads, size_t(proton.summary.log.numthreads)); + + // We need at least 1 guaranteed free worker in order to ensure progress so #documentsdbs + 1 should suffice, + // but we will not be cheap and give #documentsdbs * 2 + return std::max(threads, proton.documentdb.size() * 2);; } -static const vespalib::string CUSTOM_COMPONENT_API_PATH = "/state/v1/custom/component"; +const vespalib::string CUSTOM_COMPONENT_API_PATH = "/state/v1/custom/component"; -Proton::ProtonFileHeaderContext::ProtonFileHeaderContext(const Proton &proton_, - const vespalib::string &creator) +} + +Proton::ProtonFileHeaderContext::ProtonFileHeaderContext(const Proton &proton_, const vespalib::string &creator) : _proton(proton_), _hostName(), _creator(creator), @@ -208,7 +220,7 @@ Proton::init() { assert( ! _initStarted && ! _initComplete ); _initStarted = true; - if (_threadPool.NewThread(&_clock, NULL) == NULL) { + if (_threadPool.NewThread(&_clock, nullptr) == nullptr) { throw IllegalStateException("Failed starting thread for the cheap clock"); } _protonConfigFetcher.start(); @@ -234,8 +246,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) hwDiskCfg.samplewritesize, hwDiskCfg.shared, hwMemoryCfg.size); - _hwInfoSampler = std::make_unique<HwInfoSampler>(protonConfig.basedir, - samplerCfg); + _hwInfoSampler = std::make_unique<HwInfoSampler>(protonConfig.basedir, samplerCfg); _hwInfo = _hwInfoSampler->hwInfo(); setFS4Compression(protonConfig); _diskMemUsageSampler = std::make_unique<DiskMemUsageSampler> @@ -244,8 +255,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _metricsEngine.reset(new MetricsEngine()); _metricsEngine->addMetricsHook(_metricsHook); - _fileHeaderContext.setClusterName(protonConfig.clustername, - protonConfig.basedir); + _fileHeaderContext.setClusterName(protonConfig.clustername, protonConfig.basedir); _tls.reset(new TLS(_configUri.createWithNewId(protonConfig.tlsconfigid), _fileHeaderContext)); _matchEngine.reset(new MatchEngine(protonConfig.numsearcherthreads, protonConfig.numthreadspersearch, @@ -257,7 +267,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) const ProtonConfig::Flush & flush(protonConfig.flush); switch (flush.strategy) { case ProtonConfig::Flush::MEMORY: { - MemoryFlush::SP memoryFlush = std::make_shared<MemoryFlush>( + auto memoryFlush = std::make_shared<MemoryFlush>( MemoryFlushConfigUpdater::convertConfig(flush.memory), fastos::ClockSystem::now()); _memoryFlushConfigUpdater = std::make_unique<MemoryFlushConfigUpdater>(memoryFlush, flush.memory); _diskMemUsageSampler->notifier().addDiskMemUsageListener(_memoryFlushConfigUpdater.get()); @@ -290,14 +300,12 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) vespalib::string fileConfigId; _warmupExecutor.reset(new vespalib::ThreadStackExecutor(4, 128*1024)); - // We need at least 1 guaranteed free worker in order to ensure progress so #documentsdbs + 1 should suffice, - // but we will not be cheap and give #documentsdbs * 2 - const size_t summaryThreads = std::max(size_t(protonConfig.summary.log.numthreads), protonConfig.documentdb.size() * 2); + + const size_t summaryThreads = deriveBackgroundThreads(protonConfig); _summaryExecutor.reset(new vespalib::BlockingThreadStackExecutor(summaryThreads, 128*1024, summaryThreads*16)); InitializeThreads initializeThreads; if (protonConfig.initialize.threads > 0) { - initializeThreads = std::make_shared<vespalib::ThreadStackExecutor> - (protonConfig.initialize.threads, 128 * 1024); + initializeThreads = std::make_shared<vespalib::ThreadStackExecutor>(protonConfig.initialize.threads, 128 * 1024); _initDocumentDbsInSequence = (protonConfig.initialize.threads == 1); } _protonConfigurer.applyInitialConfig(initializeThreads); @@ -563,10 +571,8 @@ Proton::addDocumentDB(const document::DocumentType &docType, try { ret->start(); } catch (vespalib::Exception &e) { - LOG(warning, - "Failed to start database for document type '%s'; %s", - docTypeName.toString().c_str(), - e.what()); + LOG(warning, "Failed to start database for document type '%s'; %s", + docTypeName.toString().c_str(), e.what()); return DocumentDB::SP(); } // Wait for replay done on document dbs added due to reconfigs, since engines are already up and running. @@ -577,14 +583,12 @@ Proton::addDocumentDB(const document::DocumentType &docType, _metricsEngine->addDocumentDBMetrics(ret->getMetricsCollection()); _metricsEngine->addMetricsHook(ret->getMetricsUpdateHook()); _documentDBMap[docTypeName] = ret; - if (_persistenceEngine.get() != NULL) { + if (_persistenceEngine) { // Not allowed to get to service layer to call pause(). std::unique_lock<std::shared_timed_mutex> persistenceWGuard(_persistenceEngine->getWLock()); - PersistenceHandlerProxy::SP - persistenceHandler(new PersistenceHandlerProxy(ret)); + auto persistenceHandler = std::make_shared<PersistenceHandlerProxy>(ret); if (!_isInitializing) { - _persistenceEngine-> - propagateSavedClusterState(*persistenceHandler); + _persistenceEngine->propagateSavedClusterState(*persistenceHandler); _persistenceEngine->populateInitialBucketDB(bucketSpace, *persistenceHandler); } // TODO: Fix race with new cluster state setting. @@ -649,10 +653,7 @@ Proton::ping(MonitorRequest::UP request, MonitorClient & client) BootstrapConfig::SP configSnapshot = getActiveConfigSnapshot(); const ProtonConfig &protonConfig = configSnapshot->getProtonConfig(); ret.partid = protonConfig.partition; - if (_matchEngine->isOnline()) - ret.timestamp = 42; // change to flush caches on tld/qrs - else - ret.timestamp = 0; + ret.timestamp = (_matchEngine->isOnline()) ? 42 : 0; ret.activeDocs = getNumActiveDocs(); ret.activeDocsRequested = request->reportActiveDocs; return reply; @@ -661,7 +662,7 @@ Proton::ping(MonitorRequest::UP request, MonitorClient & client) bool Proton::triggerFlush() { - if ((_flushEngine.get() == NULL) || ! _flushEngine->HasThread()) { + if (!_flushEngine || ! _flushEngine->HasThread()) { return false; } _flushEngine->triggerFlush(); @@ -682,13 +683,12 @@ createPrepareRestartConfig(const ProtonConfig &protonConfig) bool Proton::prepareRestart() { - if ((_flushEngine.get() == NULL) || ! _flushEngine->HasThread()) { + if (!_flushEngine || ! _flushEngine->HasThread()) { return false; } BootstrapConfig::SP configSnapshot = getActiveConfigSnapshot(); - IFlushStrategy::SP strategy = - std::make_shared<PrepareRestartFlushStrategy>( - createPrepareRestartConfig(configSnapshot->getProtonConfig())); + auto strategy = std::make_shared<PrepareRestartFlushStrategy>( + createPrepareRestartConfig(configSnapshot->getProtonConfig())); _flushEngine->setStrategy(strategy); return true; } @@ -700,7 +700,7 @@ int countOpenFiles() static const char * const fd_dir_name = "/proc/self/fd"; int count = 0; DIR *dp = opendir(fd_dir_name); - if (dp != NULL) { + if (dp != nullptr) { struct dirent *ptr; while ((ptr = readdir(dp)) != nullptr) { if (strcmp(".", ptr->d_name) == 0) continue; @@ -738,10 +738,6 @@ Proton::updateMetrics(const vespalib::MonitorGuard &) } } -namespace { -const std::string config_id_tag = "CONFIG ID"; -} // namespace - void Proton::waitForInitDone() { |