diff options
author | Geir Storli <geirst@yahooinc.com> | 2022-01-28 15:56:29 +0000 |
---|---|---|
committer | Geir Storli <geirst@yahooinc.com> | 2022-01-28 15:56:29 +0000 |
commit | a0b919302dac06269b996e9d174850384da8e17d (patch) | |
tree | b6f2e1f0bc832c7f0d2eda04c37d23e9e55ad29d /searchlib | |
parent | d545fc9568ea84a76c2761731ce941a40b2274a7 (diff) |
Tag cpu category for tasks run in the proton shared executor.
Diffstat (limited to 'searchlib')
4 files changed, 16 insertions, 9 deletions
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp index 280800f1b58..c924937d343 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp @@ -5,9 +5,12 @@ #include "field_merger_task.h" #include "fusion_output_index.h" #include <vespa/searchcommon/common/schema.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/executor.h> #include <cassert> +using vespalib::CpuUsage; + namespace search::diskindex { FieldMergersState::FieldMergersState(const FusionOutputIndex& fusion_out_index, vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token) @@ -66,7 +69,8 @@ FieldMergersState::wait_field_mergers_done() void FieldMergersState::schedule_task(FieldMerger& field_merger) { - auto rejected = _executor.execute(std::make_unique<FieldMergerTask>(field_merger, *this)); + auto task = std::make_unique<FieldMergerTask>(field_merger, *this); + auto rejected = _executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT)); assert(!rejected); } diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp index 7d8bd4bc799..41b4b3cb794 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp @@ -71,9 +71,9 @@ Fusion::Fusion(const Schema& schema, const vespalib::string& dir, Fusion::~Fusion() = default; bool -Fusion::mergeFields(vespalib::Executor & executor, std::shared_ptr<IFlushToken> flush_token) +Fusion::mergeFields(vespalib::Executor& shared_executor, std::shared_ptr<IFlushToken> flush_token) { - FieldMergersState field_mergers_state(_fusion_out_index, executor, flush_token); + FieldMergersState field_mergers_state(_fusion_out_index, shared_executor, flush_token); const Schema &schema = getSchema(); for (SchemaUtil::IndexIterator iter(schema); iter.isValid(); ++iter) { auto& field_merger = field_mergers_state.alloc_field_merger(iter.getIndex()); @@ -103,7 +103,7 @@ Fusion::readSchemaFiles() } bool -Fusion::merge(vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token) +Fusion::merge(vespalib::Executor& shared_executor, std::shared_ptr<IFlushToken> flush_token) { FastOS_StatInfo statInfo; if (!FastOS_File::Stat(_fusion_out_index.get_path().c_str(), &statInfo)) { @@ -137,7 +137,7 @@ Fusion::merge(vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_t if (!readSchemaFiles()) { throw IllegalArgumentException("Cannot read schema files for source indexes"); } - return mergeFields(executor, flush_token); + return mergeFields(shared_executor, flush_token); } catch (const std::exception & e) { LOG(error, "%s", e.what()); return false; diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.h b/searchlib/src/vespa/searchlib/diskindex/fusion.h index 04edf77ea81..d78bad097df 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.h +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.h @@ -25,7 +25,7 @@ class Fusion private: using Schema = index::Schema; - bool mergeFields(vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token); + bool mergeFields(vespalib::Executor& shared_executor, std::shared_ptr<IFlushToken> flush_token); bool readSchemaFiles(); bool checkSchemaCompat(); @@ -43,7 +43,7 @@ public: ~Fusion(); void set_dynamic_k_pos_index_format(bool dynamic_k_pos_index_format) { _fusion_out_index.set_dynamic_k_pos_index_format(dynamic_k_pos_index_format); } void set_force_small_merge_chunk(bool force_small_merge_chunk) { _fusion_out_index.set_force_small_merge_chunk(force_small_merge_chunk); } - bool merge(vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token); + bool merge(vespalib::Executor& shared_executor, std::shared_ptr<IFlushToken> flush_token); }; } diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp index 113883a307f..6dd630a6426 100644 --- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp +++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp @@ -11,6 +11,7 @@ #include <vespa/searchlib/attribute/load_utils.h> #include <vespa/searchlib/attribute/readerbase.h> #include <vespa/vespalib/data/slime/inserter.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/memory_allocator.h> #include <vespa/vespalib/util/mmap_file_allocator_factory.h> @@ -21,6 +22,7 @@ LOG_SETUP(".searchlib.tensor.dense_tensor_attribute"); using search::attribute::LoadUtils; +using vespalib::CpuUsage; using vespalib::eval::Value; using vespalib::eval::ValueType; using vespalib::slime::ObjectInserter; @@ -324,7 +326,7 @@ DenseTensorAttribute::ThreadedLoader::load(uint32_t lid, vespalib::datastore::En // Then we can issue a new one ++_pending; - _shared_executor.execute(vespalib::makeLambdaTask([this, ref, lid]() { + auto task = vespalib::makeLambdaTask([this, ref, lid]() { auto prepared = _attr._index->prepare_add_document(lid, _attr._denseTensorStore.get_typed_cells(ref), _attr.getGenerationHandler().takeGuard()); std::unique_lock guard(_mutex); @@ -332,7 +334,8 @@ DenseTensorAttribute::ThreadedLoader::load(uint32_t lid, vespalib::datastore::En if (_queue.size() == 1) { _cond.notify_all(); } - })); + }); + _shared_executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::SETUP)); } class DenseTensorAttribute::ForegroundLoader : public Loader { public: |