summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@yahooinc.com>2022-01-27 14:48:52 +0000
committerGeir Storli <geirst@yahooinc.com>2022-01-27 15:28:21 +0000
commit557040ad59cb7e2d6269c0451ef76acaec22a2b9 (patch)
tree42a3bf9e83f13561baac08c8bc73c3a3b5a5f9a1
parent65f52f88b1fc452aff7c99739f0e32b35a3a6be4 (diff)
Tag proton thread pools with CPU categories.
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp11
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp34
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp31
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp8
-rw-r--r--vespalib/src/vespa/vespalib/util/cpu_usage.h1
5 files changed, 51 insertions, 34 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
index 2b76faa8d7f..011d97d4609 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
@@ -4,20 +4,21 @@
#include "flush_all_strategy.h"
#include "flushengine.h"
#include "flushtask.h"
-#include "tls_stats_map.h"
#include "tls_stats_factory.h"
+#include "tls_stats_map.h"
#include <vespa/searchcore/proton/common/eventlogger.h>
#include <vespa/searchlib/common/flush_token.h>
-#include <vespa/vespalib/util/jsonwriter.h>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/size_literals.h>
#include <thread>
#include <vespa/log/log.h>
LOG_SETUP(".proton.flushengine.flushengine");
-typedef vespalib::Executor::Task Task;
-using searchcorespi::IFlushTarget;
+using Task = vespalib::Executor::Task;
using searchcorespi::FlushStats;
+using searchcorespi::IFlushTarget;
+using vespalib::CpuUsage;
using namespace std::chrono_literals;
namespace proton {
@@ -86,7 +87,7 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStats
_threadPool(128_Ki),
_strategy(std::move(strategy)),
_priorityStrategy(),
- _executor(numThreads, 128_Ki, flush_engine_executor),
+ _executor(numThreads, 128_Ki, CpuUsage::wrap(flush_engine_executor, CpuUsage::Category::COMPACT)),
_lock(),
_cond(),
_handlers(),
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index ffc142b6155..ca6b3d9ba0f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -3,14 +3,16 @@
#include "executorthreadingservice.h"
#include "threading_service_config.h"
#include <vespa/searchcore/proton/metrics/executor_threading_service_stats.h>
+#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/vespalib/util/singleexecutor.h>
-#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
-using vespalib::SyncableThreadExecutor;
using vespalib::BlockingThreadStackExecutor;
-using vespalib::SingleExecutor;
+using vespalib::CpuUsage;
using vespalib::SequencedTaskExecutor;
+using vespalib::SingleExecutor;
+using vespalib::SyncableThreadExecutor;
using OptimizeFor = vespalib::Executor::OptimizeFor;
using SharedFieldWriterExecutor = proton::ThreadingServiceConfig::SharedFieldWriterExecutor;
@@ -49,11 +51,13 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor& sharedExe
uint32_t stackSize)
: _sharedExecutor(sharedExecutor),
- _masterExecutor(1, stackSize, master_executor),
+ _masterExecutor(1, stackSize, CpuUsage::wrap(master_executor, CpuUsage::Category::WRITE)),
_shared_field_writer(cfg.shared_field_writer()),
_master_task_limit(cfg.master_task_limit()),
- _indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), index_executor)),
- _summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), summary_executor)),
+ _indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(),
+ CpuUsage::wrap(index_executor, CpuUsage::Category::WRITE))),
+ _summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(),
+ CpuUsage::wrap(summary_executor, CpuUsage::Category::WRITE))),
_masterService(_masterExecutor),
_indexService(*_indexExecutor),
_indexFieldInverter(),
@@ -70,8 +74,10 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor& sharedExe
_invokeRegistrations.push_back(invokerService->registerInvoke([executor=_summaryExecutor.get()](){ executor->wakeup();}));
}
if (_shared_field_writer == SharedFieldWriterExecutor::INDEX) {
- _field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 2, cfg.defaultTaskLimit());
- _attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(),
+ _field_writer = SequencedTaskExecutor::create(CpuUsage::wrap(field_writer_executor, CpuUsage::Category::WRITE),
+ cfg.indexingThreads() * 2, cfg.defaultTaskLimit());
+ _attributeFieldWriter = SequencedTaskExecutor::create(CpuUsage::wrap(attribute_field_writer_executor, CpuUsage::Category::WRITE),
+ cfg.indexingThreads(), cfg.defaultTaskLimit(),
cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark());
if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
_invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();}));
@@ -81,7 +87,8 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor& sharedExe
_attribute_field_writer_ptr = _attributeFieldWriter.get();
} else if (_shared_field_writer == SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE) {
- _field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 3, cfg.defaultTaskLimit(),
+ _field_writer = SequencedTaskExecutor::create(CpuUsage::wrap(field_writer_executor, CpuUsage::Category::WRITE),
+ cfg.indexingThreads() * 3, cfg.defaultTaskLimit(),
cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark());
if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
_invokeRegistrations.push_back(invokerService->registerInvoke([executor=_field_writer.get()](){ executor->wakeup();}));
@@ -95,9 +102,12 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor& sharedExe
_index_field_writer_ptr = field_writer;
_attribute_field_writer_ptr = field_writer;
} else {
- _indexFieldInverter = SequencedTaskExecutor::create(index_field_inverter_executor, cfg.indexingThreads(), cfg.defaultTaskLimit());
- _indexFieldWriter = SequencedTaskExecutor::create(index_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit());
- _attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(),
+ _indexFieldInverter = SequencedTaskExecutor::create(CpuUsage::wrap(index_field_inverter_executor, CpuUsage::Category::WRITE),
+ cfg.indexingThreads(), cfg.defaultTaskLimit());
+ _indexFieldWriter = SequencedTaskExecutor::create(CpuUsage::wrap(index_field_writer_executor, CpuUsage::Category::WRITE),
+ cfg.indexingThreads(), cfg.defaultTaskLimit());
+ _attributeFieldWriter = SequencedTaskExecutor::create(CpuUsage::wrap(attribute_field_writer_executor, CpuUsage::Category::WRITE),
+ cfg.indexingThreads(), cfg.defaultTaskLimit(),
cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark());
if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
_invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();}));
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index f2bd68e53d7..d1b760e15f2 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -24,10 +24,10 @@
#include <vespa/searchcore/proton/flushengine/flushengine.h>
#include <vespa/searchcore/proton/flushengine/tls_stats_factory.h>
#include <vespa/searchcore/proton/matchengine/matchengine.h>
+#include <vespa/searchcore/proton/metrics/content_proton_metrics.h>
+#include <vespa/searchcore/proton/metrics/metrics_engine.h>
#include <vespa/searchcore/proton/persistenceengine/persistenceengine.h>
#include <vespa/searchcore/proton/reference/document_db_reference_registry.h>
-#include <vespa/searchcore/proton/metrics/metrics_engine.h>
-#include <vespa/searchcore/proton/metrics/content_proton_metrics.h>
#include <vespa/searchcore/proton/summaryengine/summaryengine.h>
#include <vespa/searchlib/common/packets.h>
#include <vespa/searchlib/transactionlog/trans_log_server_explorer.h>
@@ -36,13 +36,13 @@
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/net/state_server.h>
#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/host_name.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/util/mmap_file_allocator_factory.h>
#include <vespa/vespalib/util/random.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/vespalib/util/size_literals.h>
-#include <vespa/vespalib/util/invokeserviceimpl.h>
#ifdef __linux__
#include <malloc.h>
#endif
@@ -54,20 +54,21 @@
#include <vespa/log/log.h>
LOG_SETUP(".proton.server.proton");
+using CpuCategory = vespalib::CpuUsage::Category;
+
using document::DocumentTypeRepo;
+using search::engine::MonitorReply;
+using search::transactionlog::DomainStats;
+using vespa::config::search::core::ProtonConfig;
+using vespa::config::search::core::internal::InternalProtonType;
+using vespalib::CpuUsage;
using vespalib::FileHeader;
using vespalib::IllegalStateException;
using vespalib::Slime;
+using vespalib::compression::CompressionConfig;
using vespalib::makeLambdaTask;
using vespalib::slime::ArrayInserter;
using vespalib::slime::Cursor;
-using CpuCategory = vespalib::CpuUsage::Category;
-
-using search::transactionlog::DomainStats;
-using vespa::config::search::core::ProtonConfig;
-using vespa::config::search::core::internal::InternalProtonType;
-using vespalib::compression::CompressionConfig;
-using search::engine::MonitorReply;
namespace proton {
@@ -140,8 +141,8 @@ struct MetricsUpdateHook : metrics::UpdateHook
const vespalib::string CUSTOM_COMPONENT_API_PATH = "/state/v1/custom/component";
-VESPA_THREAD_STACK_TAG(initialize_executor)
-VESPA_THREAD_STACK_TAG(close_executor)
+VESPA_THREAD_STACK_TAG(proton_initialize_executor)
+VESPA_THREAD_STACK_TAG(proton_close_executor)
}
@@ -331,7 +332,8 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
_compile_cache_executor_binding = vespalib::eval::CompileCache::bind(_shared_service->shared_raw());
InitializeThreads initializeThreads;
if (protonConfig.initialize.threads > 0) {
- initializeThreads = std::make_shared<vespalib::ThreadStackExecutor>(protonConfig.initialize.threads, 128_Ki, initialize_executor);
+ initializeThreads = std::make_shared<vespalib::ThreadStackExecutor>(protonConfig.initialize.threads, 128_Ki,
+ CpuUsage::wrap(proton_initialize_executor, CpuCategory::SETUP));
_initDocumentDbsInSequence = (protonConfig.initialize.threads == 1);
}
_protonConfigurer.applyInitialConfig(initializeThreads);
@@ -465,7 +467,8 @@ Proton::~Proton()
}
}
- vespalib::ThreadStackExecutor closePool(std::min(_documentDBMap.size(), numCores), 0x20000, close_executor);
+ vespalib::ThreadStackExecutor closePool(std::min(_documentDBMap.size(), numCores), 0x20000,
+ CpuUsage::wrap(proton_close_executor, CpuCategory::SETUP));
closeDocumentDBs(closePool);
}
_documentDBMap.clear();
diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
index 5a3eaa02d3b..7e799e506e3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
@@ -2,10 +2,12 @@
#include "shared_threading_service.h"
#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/vespalib/util/size_literals.h>
-#include <vespa/vespalib/util/invokeserviceimpl.h>
+
+using vespalib::CpuUsage;
VESPA_THREAD_STACK_TAG(proton_field_writer_executor)
VESPA_THREAD_STACK_TAG(proton_shared_executor)
@@ -16,7 +18,7 @@ namespace proton {
using SharedFieldWriterExecutor = ThreadingServiceConfig::ProtonConfig::Feeding::SharedFieldWriterExecutor;
SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfig& cfg)
- : _warmup(cfg.warmup_threads(), 128_Ki, proton_warmup_executor),
+ : _warmup(cfg.warmup_threads(), 128_Ki, CpuUsage::wrap(proton_warmup_executor, CpuUsage::Category::COMPACT)),
_shared(std::make_shared<vespalib::BlockingThreadStackExecutor>(cfg.shared_threads(), 128_Ki,
cfg.shared_task_limit(), proton_shared_executor)),
_field_writer(),
@@ -25,7 +27,7 @@ SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfi
{
const auto& fw_cfg = cfg.field_writer_config();
if (fw_cfg.shared_field_writer() == SharedFieldWriterExecutor::DOCUMENT_DB) {
- _field_writer = vespalib::SequencedTaskExecutor::create(proton_field_writer_executor,
+ _field_writer = vespalib::SequencedTaskExecutor::create(CpuUsage::wrap(proton_field_writer_executor, CpuUsage::Category::WRITE),
fw_cfg.indexingThreads() * 3,
fw_cfg.defaultTaskLimit(),
fw_cfg.is_task_limit_hard(),
diff --git a/vespalib/src/vespa/vespalib/util/cpu_usage.h b/vespalib/src/vespa/vespalib/util/cpu_usage.h
index 6fd1a370887..fd270d194b4 100644
--- a/vespalib/src/vespa/vespalib/util/cpu_usage.h
+++ b/vespalib/src/vespa/vespalib/util/cpu_usage.h
@@ -1,4 +1,5 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
#include "runnable.h"
#include "executor.h"