diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-01-28 18:28:10 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-28 18:28:10 +0100 |
commit | bd19f4a9fdeb4ea5e7a64364d5336fec2141d3ce (patch) | |
tree | f85877909d0b23d98259bec1ead69c1a41e485b7 /searchlib | |
parent | 363d584663c1d84425b9e701b8564d0a92cd4a56 (diff) | |
parent | b6d8ba500409c59c446e24e0253952ecc57af95c (diff) |
Merge pull request #20980 from vespa-engine/geirst/tag-tasks-and-threads-with-cpu-category
Tag tasks and threads with cpu category
Diffstat (limited to 'searchlib')
5 files changed, 27 insertions, 18 deletions
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp index 280800f1b58..c924937d343 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp @@ -5,9 +5,12 @@ #include "field_merger_task.h" #include "fusion_output_index.h" #include <vespa/searchcommon/common/schema.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/executor.h> #include <cassert> +using vespalib::CpuUsage; + namespace search::diskindex { FieldMergersState::FieldMergersState(const FusionOutputIndex& fusion_out_index, vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token) @@ -66,7 +69,8 @@ FieldMergersState::wait_field_mergers_done() void FieldMergersState::schedule_task(FieldMerger& field_merger) { - auto rejected = _executor.execute(std::make_unique<FieldMergerTask>(field_merger, *this)); + auto task = std::make_unique<FieldMergerTask>(field_merger, *this); + auto rejected = _executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT)); assert(!rejected); } diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp index 7d8bd4bc799..41b4b3cb794 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp @@ -71,9 +71,9 @@ Fusion::Fusion(const Schema& schema, const vespalib::string& dir, Fusion::~Fusion() = default; bool -Fusion::mergeFields(vespalib::Executor & executor, std::shared_ptr<IFlushToken> flush_token) +Fusion::mergeFields(vespalib::Executor& shared_executor, std::shared_ptr<IFlushToken> flush_token) { - FieldMergersState field_mergers_state(_fusion_out_index, executor, flush_token); + FieldMergersState field_mergers_state(_fusion_out_index, shared_executor, flush_token); const Schema &schema = getSchema(); for (SchemaUtil::IndexIterator iter(schema); iter.isValid(); ++iter) { auto& field_merger = field_mergers_state.alloc_field_merger(iter.getIndex()); @@ -103,7 +103,7 @@ Fusion::readSchemaFiles() } bool -Fusion::merge(vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token) +Fusion::merge(vespalib::Executor& shared_executor, std::shared_ptr<IFlushToken> flush_token) { FastOS_StatInfo statInfo; if (!FastOS_File::Stat(_fusion_out_index.get_path().c_str(), &statInfo)) { @@ -137,7 +137,7 @@ Fusion::merge(vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_t if (!readSchemaFiles()) { throw IllegalArgumentException("Cannot read schema files for source indexes"); } - return mergeFields(executor, flush_token); + return mergeFields(shared_executor, flush_token); } catch (const std::exception & e) { LOG(error, "%s", e.what()); return false; diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.h b/searchlib/src/vespa/searchlib/diskindex/fusion.h index 04edf77ea81..d78bad097df 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.h +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.h @@ -25,7 +25,7 @@ class Fusion private: using Schema = index::Schema; - bool mergeFields(vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token); + bool mergeFields(vespalib::Executor& shared_executor, std::shared_ptr<IFlushToken> flush_token); bool readSchemaFiles(); bool checkSchemaCompat(); @@ -43,7 +43,7 @@ public: ~Fusion(); void set_dynamic_k_pos_index_format(bool dynamic_k_pos_index_format) { _fusion_out_index.set_dynamic_k_pos_index_format(dynamic_k_pos_index_format); } void set_force_small_merge_chunk(bool force_small_merge_chunk) { _fusion_out_index.set_force_small_merge_chunk(force_small_merge_chunk); } - bool merge(vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token); + bool merge(vespalib::Executor& shared_executor, std::shared_ptr<IFlushToken> flush_token); }; } diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp index 113883a307f..6dd630a6426 100644 --- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp +++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp @@ -11,6 +11,7 @@ #include <vespa/searchlib/attribute/load_utils.h> #include <vespa/searchlib/attribute/readerbase.h> #include <vespa/vespalib/data/slime/inserter.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/memory_allocator.h> #include <vespa/vespalib/util/mmap_file_allocator_factory.h> @@ -21,6 +22,7 @@ LOG_SETUP(".searchlib.tensor.dense_tensor_attribute"); using search::attribute::LoadUtils; +using vespalib::CpuUsage; using vespalib::eval::Value; using vespalib::eval::ValueType; using vespalib::slime::ObjectInserter; @@ -324,7 +326,7 @@ DenseTensorAttribute::ThreadedLoader::load(uint32_t lid, vespalib::datastore::En // Then we can issue a new one ++_pending; - _shared_executor.execute(vespalib::makeLambdaTask([this, ref, lid]() { + auto task = vespalib::makeLambdaTask([this, ref, lid]() { auto prepared = _attr._index->prepare_add_document(lid, _attr._denseTensorStore.get_typed_cells(ref), _attr.getGenerationHandler().takeGuard()); std::unique_lock guard(_mutex); @@ -332,7 +334,8 @@ DenseTensorAttribute::ThreadedLoader::load(uint32_t lid, vespalib::datastore::En if (_queue.size() == 1) { _cond.notify_all(); } - })); + }); + _shared_executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::SETUP)); } class DenseTensorAttribute::ForegroundLoader : public Loader { public: diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index db2cf2a255d..ccef92c0802 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -2,27 +2,29 @@ #include "translogserver.h" #include "domain.h" #include "client_common.h" -#include <vespa/vespalib/util/destructor_callbacks.h> -#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/transport.h> #include <vespa/vespalib/io/fileutil.h> +#include <vespa/vespalib/util/cpu_usage.h> +#include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/size_literals.h> -#include <vespa/fnet/frt/supervisor.h> -#include <vespa/fnet/frt/rpcrequest.h> -#include <vespa/fnet/transport.h> +#include <vespa/vespalib/util/stringfmt.h> #include <fstream> #include <thread> #include <vespa/log/log.h> LOG_SETUP(".transactionlog.server"); -using vespalib::make_string; -using vespalib::stringref; -using vespalib::IllegalArgumentException; using search::common::FileHeaderContext; using std::make_shared; using std::runtime_error; +using vespalib::CpuUsage; +using vespalib::IllegalArgumentException; +using vespalib::make_string; +using vespalib::stringref; using namespace std::chrono_literals; namespace search::transactionlog { @@ -97,7 +99,7 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con _name(name), _baseDir(baseDir), _domainConfig(cfg), - _executor(maxThreads, 128_Ki, tls_executor), + _executor(maxThreads, 128_Ki, CpuUsage::wrap(tls_executor, CpuUsage::Category::WRITE)), _threadPool(std::make_unique<FastOS_ThreadPool>(120_Ki)), _transport(std::make_unique<FNET_Transport>()), _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())), |