diff options
author | Geir Storli <geirst@yahooinc.com> | 2022-01-27 14:48:52 +0000 |
---|---|---|
committer | Geir Storli <geirst@yahooinc.com> | 2022-01-27 15:28:21 +0000 |
commit | 557040ad59cb7e2d6269c0451ef76acaec22a2b9 (patch) | |
tree | 42a3bf9e83f13561baac08c8bc73c3a3b5a5f9a1 | |
parent | 65f52f88b1fc452aff7c99739f0e32b35a3a6be4 (diff) |
Tag proton thread pools with CPU categories.
5 files changed, 51 insertions, 34 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp index 2b76faa8d7f..011d97d4609 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp @@ -4,20 +4,21 @@ #include "flush_all_strategy.h" #include "flushengine.h" #include "flushtask.h" -#include "tls_stats_map.h" #include "tls_stats_factory.h" +#include "tls_stats_map.h" #include <vespa/searchcore/proton/common/eventlogger.h> #include <vespa/searchlib/common/flush_token.h> -#include <vespa/vespalib/util/jsonwriter.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/size_literals.h> #include <thread> #include <vespa/log/log.h> LOG_SETUP(".proton.flushengine.flushengine"); -typedef vespalib::Executor::Task Task; -using searchcorespi::IFlushTarget; +using Task = vespalib::Executor::Task; using searchcorespi::FlushStats; +using searchcorespi::IFlushTarget; +using vespalib::CpuUsage; using namespace std::chrono_literals; namespace proton { @@ -86,7 +87,7 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStats _threadPool(128_Ki), _strategy(std::move(strategy)), _priorityStrategy(), - _executor(numThreads, 128_Ki, flush_engine_executor), + _executor(numThreads, 128_Ki, CpuUsage::wrap(flush_engine_executor, CpuUsage::Category::COMPACT)), _lock(), _cond(), _handlers(), diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index ffc142b6155..ca6b3d9ba0f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -3,14 +3,16 @@ #include "executorthreadingservice.h" #include "threading_service_config.h" #include <vespa/searchcore/proton/metrics/executor_threading_service_stats.h> +#include <vespa/vespalib/util/blockingthreadstackexecutor.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/singleexecutor.h> -#include <vespa/vespalib/util/blockingthreadstackexecutor.h> -using vespalib::SyncableThreadExecutor; using vespalib::BlockingThreadStackExecutor; -using vespalib::SingleExecutor; +using vespalib::CpuUsage; using vespalib::SequencedTaskExecutor; +using vespalib::SingleExecutor; +using vespalib::SyncableThreadExecutor; using OptimizeFor = vespalib::Executor::OptimizeFor; using SharedFieldWriterExecutor = proton::ThreadingServiceConfig::SharedFieldWriterExecutor; @@ -49,11 +51,13 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor& sharedExe uint32_t stackSize) : _sharedExecutor(sharedExecutor), - _masterExecutor(1, stackSize, master_executor), + _masterExecutor(1, stackSize, CpuUsage::wrap(master_executor, CpuUsage::Category::WRITE)), _shared_field_writer(cfg.shared_field_writer()), _master_task_limit(cfg.master_task_limit()), - _indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), index_executor)), - _summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), summary_executor)), + _indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), + CpuUsage::wrap(index_executor, CpuUsage::Category::WRITE))), + _summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), + CpuUsage::wrap(summary_executor, CpuUsage::Category::WRITE))), _masterService(_masterExecutor), _indexService(*_indexExecutor), _indexFieldInverter(), @@ -70,8 +74,10 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor& sharedExe _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_summaryExecutor.get()](){ executor->wakeup();})); } if (_shared_field_writer == SharedFieldWriterExecutor::INDEX) { - _field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 2, cfg.defaultTaskLimit()); - _attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(), + _field_writer = SequencedTaskExecutor::create(CpuUsage::wrap(field_writer_executor, CpuUsage::Category::WRITE), + cfg.indexingThreads() * 2, cfg.defaultTaskLimit()); + _attributeFieldWriter = SequencedTaskExecutor::create(CpuUsage::wrap(attribute_field_writer_executor, CpuUsage::Category::WRITE), + cfg.indexingThreads(), cfg.defaultTaskLimit(), cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark()); if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();})); @@ -81,7 +87,8 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor& sharedExe _attribute_field_writer_ptr = _attributeFieldWriter.get(); } else if (_shared_field_writer == SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE) { - _field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 3, cfg.defaultTaskLimit(), + _field_writer = SequencedTaskExecutor::create(CpuUsage::wrap(field_writer_executor, CpuUsage::Category::WRITE), + cfg.indexingThreads() * 3, cfg.defaultTaskLimit(), cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark()); if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_field_writer.get()](){ executor->wakeup();})); @@ -95,9 +102,12 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor& sharedExe _index_field_writer_ptr = field_writer; _attribute_field_writer_ptr = field_writer; } else { - _indexFieldInverter = SequencedTaskExecutor::create(index_field_inverter_executor, cfg.indexingThreads(), cfg.defaultTaskLimit()); - _indexFieldWriter = SequencedTaskExecutor::create(index_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit()); - _attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(), + _indexFieldInverter = SequencedTaskExecutor::create(CpuUsage::wrap(index_field_inverter_executor, CpuUsage::Category::WRITE), + cfg.indexingThreads(), cfg.defaultTaskLimit()); + _indexFieldWriter = SequencedTaskExecutor::create(CpuUsage::wrap(index_field_writer_executor, CpuUsage::Category::WRITE), + cfg.indexingThreads(), cfg.defaultTaskLimit()); + _attributeFieldWriter = SequencedTaskExecutor::create(CpuUsage::wrap(attribute_field_writer_executor, CpuUsage::Category::WRITE), + cfg.indexingThreads(), cfg.defaultTaskLimit(), cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark()); if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();})); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index f2bd68e53d7..d1b760e15f2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -24,10 +24,10 @@ #include <vespa/searchcore/proton/flushengine/flushengine.h> #include <vespa/searchcore/proton/flushengine/tls_stats_factory.h> #include <vespa/searchcore/proton/matchengine/matchengine.h> +#include <vespa/searchcore/proton/metrics/content_proton_metrics.h> +#include <vespa/searchcore/proton/metrics/metrics_engine.h> #include <vespa/searchcore/proton/persistenceengine/persistenceengine.h> #include <vespa/searchcore/proton/reference/document_db_reference_registry.h> -#include <vespa/searchcore/proton/metrics/metrics_engine.h> -#include <vespa/searchcore/proton/metrics/content_proton_metrics.h> #include <vespa/searchcore/proton/summaryengine/summaryengine.h> #include <vespa/searchlib/common/packets.h> #include <vespa/searchlib/transactionlog/trans_log_server_explorer.h> @@ -36,13 +36,13 @@ #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/net/state_server.h> #include <vespa/vespalib/util/blockingthreadstackexecutor.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/host_name.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/mmap_file_allocator_factory.h> #include <vespa/vespalib/util/random.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/size_literals.h> -#include <vespa/vespalib/util/invokeserviceimpl.h> #ifdef __linux__ #include <malloc.h> #endif @@ -54,20 +54,21 @@ #include <vespa/log/log.h> LOG_SETUP(".proton.server.proton"); +using CpuCategory = vespalib::CpuUsage::Category; + using document::DocumentTypeRepo; +using search::engine::MonitorReply; +using search::transactionlog::DomainStats; +using vespa::config::search::core::ProtonConfig; +using vespa::config::search::core::internal::InternalProtonType; +using vespalib::CpuUsage; using vespalib::FileHeader; using vespalib::IllegalStateException; using vespalib::Slime; +using vespalib::compression::CompressionConfig; using vespalib::makeLambdaTask; using vespalib::slime::ArrayInserter; using vespalib::slime::Cursor; -using CpuCategory = vespalib::CpuUsage::Category; - -using search::transactionlog::DomainStats; -using vespa::config::search::core::ProtonConfig; -using vespa::config::search::core::internal::InternalProtonType; -using vespalib::compression::CompressionConfig; -using search::engine::MonitorReply; namespace proton { @@ -140,8 +141,8 @@ struct MetricsUpdateHook : metrics::UpdateHook const vespalib::string CUSTOM_COMPONENT_API_PATH = "/state/v1/custom/component"; -VESPA_THREAD_STACK_TAG(initialize_executor) -VESPA_THREAD_STACK_TAG(close_executor) +VESPA_THREAD_STACK_TAG(proton_initialize_executor) +VESPA_THREAD_STACK_TAG(proton_close_executor) } @@ -331,7 +332,8 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _compile_cache_executor_binding = vespalib::eval::CompileCache::bind(_shared_service->shared_raw()); InitializeThreads initializeThreads; if (protonConfig.initialize.threads > 0) { - initializeThreads = std::make_shared<vespalib::ThreadStackExecutor>(protonConfig.initialize.threads, 128_Ki, initialize_executor); + initializeThreads = std::make_shared<vespalib::ThreadStackExecutor>(protonConfig.initialize.threads, 128_Ki, + CpuUsage::wrap(proton_initialize_executor, CpuCategory::SETUP)); _initDocumentDbsInSequence = (protonConfig.initialize.threads == 1); } _protonConfigurer.applyInitialConfig(initializeThreads); @@ -465,7 +467,8 @@ Proton::~Proton() } } - vespalib::ThreadStackExecutor closePool(std::min(_documentDBMap.size(), numCores), 0x20000, close_executor); + vespalib::ThreadStackExecutor closePool(std::min(_documentDBMap.size(), numCores), 0x20000, + CpuUsage::wrap(proton_close_executor, CpuCategory::SETUP)); closeDocumentDBs(closePool); } _documentDBMap.clear(); diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp index 5a3eaa02d3b..7e799e506e3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp @@ -2,10 +2,12 @@ #include "shared_threading_service.h" #include <vespa/vespalib/util/blockingthreadstackexecutor.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/size_literals.h> -#include <vespa/vespalib/util/invokeserviceimpl.h> + +using vespalib::CpuUsage; VESPA_THREAD_STACK_TAG(proton_field_writer_executor) VESPA_THREAD_STACK_TAG(proton_shared_executor) @@ -16,7 +18,7 @@ namespace proton { using SharedFieldWriterExecutor = ThreadingServiceConfig::ProtonConfig::Feeding::SharedFieldWriterExecutor; SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfig& cfg) - : _warmup(cfg.warmup_threads(), 128_Ki, proton_warmup_executor), + : _warmup(cfg.warmup_threads(), 128_Ki, CpuUsage::wrap(proton_warmup_executor, CpuUsage::Category::COMPACT)), _shared(std::make_shared<vespalib::BlockingThreadStackExecutor>(cfg.shared_threads(), 128_Ki, cfg.shared_task_limit(), proton_shared_executor)), _field_writer(), @@ -25,7 +27,7 @@ SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfi { const auto& fw_cfg = cfg.field_writer_config(); if (fw_cfg.shared_field_writer() == SharedFieldWriterExecutor::DOCUMENT_DB) { - _field_writer = vespalib::SequencedTaskExecutor::create(proton_field_writer_executor, + _field_writer = vespalib::SequencedTaskExecutor::create(CpuUsage::wrap(proton_field_writer_executor, CpuUsage::Category::WRITE), fw_cfg.indexingThreads() * 3, fw_cfg.defaultTaskLimit(), fw_cfg.is_task_limit_hard(), diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.h b/vespalib/src/vespa/vespalib/util/cpu_usage.h index 6fd1a370887..fd270d194b4 100644 --- a/vespalib/src/vespa/vespalib/util/cpu_usage.h +++ b/vespalib/src/vespa/vespalib/util/cpu_usage.h @@ -1,4 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once #include "runnable.h" #include "executor.h" |