From a0b919302dac06269b996e9d174850384da8e17d Mon Sep 17 00:00:00 2001 From: Geir Storli Date: Fri, 28 Jan 2022 15:56:29 +0000 Subject: Tag cpu category for tasks run in the proton shared executor. --- searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp | 6 +++++- searchlib/src/vespa/searchlib/diskindex/fusion.cpp | 8 ++++---- searchlib/src/vespa/searchlib/diskindex/fusion.h | 4 ++-- searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp | 7 +++++-- 4 files changed, 16 insertions(+), 9 deletions(-) (limited to 'searchlib') diff --git a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp index 280800f1b58..c924937d343 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp @@ -5,9 +5,12 @@ #include "field_merger_task.h" #include "fusion_output_index.h" #include +#include #include #include +using vespalib::CpuUsage; + namespace search::diskindex { FieldMergersState::FieldMergersState(const FusionOutputIndex& fusion_out_index, vespalib::Executor& executor, std::shared_ptr flush_token) @@ -66,7 +69,8 @@ FieldMergersState::wait_field_mergers_done() void FieldMergersState::schedule_task(FieldMerger& field_merger) { - auto rejected = _executor.execute(std::make_unique(field_merger, *this)); + auto task = std::make_unique(field_merger, *this); + auto rejected = _executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT)); assert(!rejected); } diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp index 7d8bd4bc799..41b4b3cb794 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp @@ -71,9 +71,9 @@ Fusion::Fusion(const Schema& schema, const vespalib::string& dir, Fusion::~Fusion() = default; bool -Fusion::mergeFields(vespalib::Executor & executor, std::shared_ptr flush_token) +Fusion::mergeFields(vespalib::Executor& shared_executor, std::shared_ptr flush_token) { - FieldMergersState field_mergers_state(_fusion_out_index, executor, flush_token); + FieldMergersState field_mergers_state(_fusion_out_index, shared_executor, flush_token); const Schema &schema = getSchema(); for (SchemaUtil::IndexIterator iter(schema); iter.isValid(); ++iter) { auto& field_merger = field_mergers_state.alloc_field_merger(iter.getIndex()); @@ -103,7 +103,7 @@ Fusion::readSchemaFiles() } bool -Fusion::merge(vespalib::Executor& executor, std::shared_ptr flush_token) +Fusion::merge(vespalib::Executor& shared_executor, std::shared_ptr flush_token) { FastOS_StatInfo statInfo; if (!FastOS_File::Stat(_fusion_out_index.get_path().c_str(), &statInfo)) { @@ -137,7 +137,7 @@ Fusion::merge(vespalib::Executor& executor, std::shared_ptr flush_t if (!readSchemaFiles()) { throw IllegalArgumentException("Cannot read schema files for source indexes"); } - return mergeFields(executor, flush_token); + return mergeFields(shared_executor, flush_token); } catch (const std::exception & e) { LOG(error, "%s", e.what()); return false; diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.h b/searchlib/src/vespa/searchlib/diskindex/fusion.h index 04edf77ea81..d78bad097df 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.h +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.h @@ -25,7 +25,7 @@ class Fusion private: using Schema = index::Schema; - bool mergeFields(vespalib::Executor& executor, std::shared_ptr flush_token); + bool mergeFields(vespalib::Executor& shared_executor, std::shared_ptr flush_token); bool readSchemaFiles(); bool checkSchemaCompat(); @@ -43,7 +43,7 @@ public: ~Fusion(); void set_dynamic_k_pos_index_format(bool dynamic_k_pos_index_format) { _fusion_out_index.set_dynamic_k_pos_index_format(dynamic_k_pos_index_format); } void set_force_small_merge_chunk(bool force_small_merge_chunk) { _fusion_out_index.set_force_small_merge_chunk(force_small_merge_chunk); } - bool merge(vespalib::Executor& executor, std::shared_ptr flush_token); + bool merge(vespalib::Executor& shared_executor, std::shared_ptr flush_token); }; } diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp index 113883a307f..6dd630a6426 100644 --- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp +++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -21,6 +22,7 @@ LOG_SETUP(".searchlib.tensor.dense_tensor_attribute"); using search::attribute::LoadUtils; +using vespalib::CpuUsage; using vespalib::eval::Value; using vespalib::eval::ValueType; using vespalib::slime::ObjectInserter; @@ -324,7 +326,7 @@ DenseTensorAttribute::ThreadedLoader::load(uint32_t lid, vespalib::datastore::En // Then we can issue a new one ++_pending; - _shared_executor.execute(vespalib::makeLambdaTask([this, ref, lid]() { + auto task = vespalib::makeLambdaTask([this, ref, lid]() { auto prepared = _attr._index->prepare_add_document(lid, _attr._denseTensorStore.get_typed_cells(ref), _attr.getGenerationHandler().takeGuard()); std::unique_lock guard(_mutex); @@ -332,7 +334,8 @@ DenseTensorAttribute::ThreadedLoader::load(uint32_t lid, vespalib::datastore::En if (_queue.size() == 1) { _cond.notify_all(); } - })); + }); + _shared_executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::SETUP)); } class DenseTensorAttribute::ForegroundLoader : public Loader { public: -- cgit v1.2.3