diff options
author | Geir Storli <geirst@yahooinc.com> | 2022-01-28 15:56:29 +0000 |
---|---|---|
committer | Geir Storli <geirst@yahooinc.com> | 2022-01-28 15:56:29 +0000 |
commit | a0b919302dac06269b996e9d174850384da8e17d (patch) | |
tree | b6f2e1f0bc832c7f0d2eda04c37d23e9e55ad29d | |
parent | d545fc9568ea84a76c2761731ce941a40b2274a7 (diff) |
Tag cpu category for tasks run in the proton shared executor.
13 files changed, 56 insertions, 44 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.cpp index 66be0737fe9..713582bf1ce 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.cpp @@ -193,7 +193,7 @@ AttributeInitializer::loadAttribute(const AttributeVectorSP &attr, assert(attr->hasLoadData()); vespalib::Timer timer; EventLogger::loadAttributeStart(_documentSubDbName, attr->getName()); - if (!attr->load(&_executor)) { + if (!attr->load(&_shared_executor)) { LOG(warning, "Could not load attribute vector '%s' from disk. Returning empty attribute vector", attr->getBaseFileName().c_str()); return false; @@ -235,13 +235,13 @@ AttributeInitializer::AttributeInitializer(const std::shared_ptr<AttributeDirect const AttributeSpec &spec, uint64_t currentSerialNum, const IAttributeFactory &factory, - vespalib::Executor & executor) + vespalib::Executor& shared_executor) : _attrDir(attrDir), _documentSubDbName(documentSubDbName), _spec(spec), _currentSerialNum(currentSerialNum), _factory(factory), - _executor(executor), + _shared_executor(shared_executor), _header(), _header_ok(false) { diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.h b/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.h index 7b5d968353a..78a798c929e 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_initializer.h @@ -30,7 +30,7 @@ private: const AttributeSpec _spec; const uint64_t _currentSerialNum; const IAttributeFactory &_factory; - vespalib::Executor &_executor; + vespalib::Executor &_shared_executor; std::unique_ptr<const search::attribute::AttributeHeader> _header; bool _header_ok; @@ -48,7 +48,7 @@ private: public: AttributeInitializer(const std::shared_ptr<AttributeDirectory> &attrDir, const vespalib::string &documentSubDbName, const AttributeSpec &spec, uint64_t currentSerialNum, const IAttributeFactory &factory, - vespalib::Executor & executor); + vespalib::Executor& shared_executor); ~AttributeInitializer(); AttributeInitializerResult init() const; diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp index b3b5f2486fa..f0a23f1038e 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp @@ -14,6 +14,7 @@ #include <vespa/searchlib/attribute/imported_attribute_vector.h> #include <vespa/searchlib/tensor/prepare_result.h> #include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/util/gate.h> #include <vespa/vespalib/util/idestructorcallback.h> @@ -28,6 +29,7 @@ using namespace search; using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId; using search::attribute::ImportedAttributeVector; using search::tensor::PrepareResult; +using vespalib::CpuUsage; using vespalib::GateCallback; using vespalib::ISequencedTaskExecutor; @@ -630,7 +632,7 @@ AttributeWriter::internalPut(SerialNum serialNum, const Document &doc, DocumentI wc.consider_build_field_paths(doc); auto prepare_task = std::make_unique<PreparePutTask>(serialNum, lid, wc, doc); auto complete_task = std::make_unique<CompletePutTask>(*prepare_task, onWriteDone); - _shared_executor.execute(std::move(prepare_task)); + _shared_executor.execute(CpuUsage::wrap(std::move(prepare_task), CpuUsage::Category::WRITE)); _attributeFieldWriter.executeTask(wc.getExecutorId(), std::move(complete_task)); } else { if (allAttributes || wc.hasStructFieldAttribute()) { @@ -780,7 +782,7 @@ AttributeWriter::update(SerialNum serialNum, const DocumentUpdate &upd, Document auto complete_task = std::make_unique<CompletePutTask>(*prepare_task, onWriteDone); LOG(debug, "About to handle assign update as two phase put for docid %u in attribute vector '%s'", lid, attrp->getName().c_str()); - _shared_executor.execute(std::move(prepare_task)); + _shared_executor.execute(CpuUsage::wrap(std::move(prepare_task), CpuUsage::Category::WRITE)); _attributeFieldWriter.executeTask(itr->second.executor_id, std::move(complete_task)); } else { args[itr->second.executor_id.getId()]->_updates.emplace_back(attrp, &fupd); diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.h b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.h index 76aa6a7df64..beea59b5350 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.h +++ b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.h @@ -31,14 +31,14 @@ class ShrinkLidSpaceFlushTarget; class AttributeManager : public proton::IAttributeManager { private: - typedef search::attribute::Config Config; - typedef search::SerialNum SerialNum; - typedef AttributeCollectionSpec Spec; + using AttributeReadGuard = search::attribute::AttributeReadGuard; + using AttributeVectorSP = std::shared_ptr<search::AttributeVector>; + using Config = search::attribute::Config; using FlushableAttributeSP = std::shared_ptr<FlushableAttribute>; - using ShrinkerSP = std::shared_ptr<ShrinkLidSpaceFlushTarget>; using IFlushTargetSP = std::shared_ptr<searchcorespi::IFlushTarget>; - using AttributeVectorSP = std::shared_ptr<search::AttributeVector>; - using AttributeReadGuard = search::attribute::AttributeReadGuard; + using SerialNum = search::SerialNum; + using ShrinkerSP = std::shared_ptr<ShrinkLidSpaceFlushTarget>; + using Spec = AttributeCollectionSpec; class AttributeWrap { @@ -67,8 +67,8 @@ private: const ShrinkerSP &getShrinker() const { return _shrinker; } }; - typedef vespalib::hash_map<vespalib::string, AttributeWrap> AttributeMap; - typedef vespalib::hash_map<vespalib::string, FlushableWrap> FlushableMap; + using AttributeMap = vespalib::hash_map<vespalib::string, AttributeWrap>; + using FlushableMap = vespalib::hash_map<vespalib::string, FlushableWrap>; AttributeMap _attributes; FlushableMap _flushables; @@ -100,7 +100,7 @@ private: void transferExtraAttributes(const AttributeManager &currMgr); public: - typedef std::shared_ptr<AttributeManager> SP; + using SP = std::shared_ptr<AttributeManager>; AttributeManager(const vespalib::string &baseDir, const vespalib::string &documentSubDbName, diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp index 6b938c4cc73..a04f5bfa651 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.cpp @@ -145,7 +145,7 @@ SummaryManager::createSummarySetup(const SummaryConfig & summaryCfg, const Summa juniperCfg, attributeMgr, _docStore, repo); } -SummaryManager::SummaryManager(vespalib::Executor & executor, const LogDocumentStore::Config & storeConfig, +SummaryManager::SummaryManager(vespalib::Executor &shared_executor, const LogDocumentStore::Config & storeConfig, const search::GrowStrategy & growStrategy, const vespalib::string &baseDir, const DocTypeName &docTypeName, const TuneFileSummary &tuneFileSummary, const FileHeaderContext &fileHeaderContext, search::transactionlog::SyncProxy &tlSyncer, @@ -154,7 +154,7 @@ SummaryManager::SummaryManager(vespalib::Executor & executor, const LogDocumentS _docTypeName(docTypeName), _docStore() { - _docStore = std::make_shared<LogDocumentStore>(executor, baseDir, storeConfig, growStrategy, tuneFileSummary, + _docStore = std::make_shared<LogDocumentStore>(shared_executor, baseDir, storeConfig, growStrategy, tuneFileSummary, fileHeaderContext, tlSyncer, std::move(bucketizer)); } diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h index f65db787f9d..ba55761d091 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanager.h @@ -59,7 +59,7 @@ private: public: typedef std::shared_ptr<SummaryManager> SP; - SummaryManager(vespalib::Executor & executor, + SummaryManager(vespalib::Executor &shared_executor, const search::LogDocumentStore::Config & summary, const search::GrowStrategy & growStrategy, const vespalib::string &baseDir, diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.cpp b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.cpp index 5d3c3548113..568fa740d6f 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.cpp +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.cpp @@ -11,7 +11,7 @@ SummaryManagerInitializer(const search::GrowStrategy &grow, const vespalib::string & baseDir, const vespalib::string &subDbName, const DocTypeName &docTypeName, - vespalib::Executor &summaryExecutor, + vespalib::Executor &shared_executor, const search::LogDocumentStore::Config & storeCfg, const search::TuneFileSummary &tuneFile, const search::common::FileHeaderContext &fileHeaderContext, @@ -23,7 +23,7 @@ SummaryManagerInitializer(const search::GrowStrategy &grow, _baseDir(baseDir), _subDbName(subDbName), _docTypeName(docTypeName), - _summaryExecutor(summaryExecutor), + _shared_executor(shared_executor), _storeCfg(storeCfg), _tuneFile(tuneFile), _fileHeaderContext(fileHeaderContext), @@ -41,7 +41,7 @@ SummaryManagerInitializer::run() vespalib::Timer timer; EventLogger::loadDocumentStoreStart(_subDbName); *_result = std::make_shared<SummaryManager> - (_summaryExecutor, _storeCfg, _grow, _baseDir, _docTypeName, + (_shared_executor, _storeCfg, _grow, _baseDir, _docTypeName, _tuneFile, _fileHeaderContext, _tlSyncer, _bucketizer); EventLogger::loadDocumentStoreComplete(_subDbName, timer.elapsed()); } diff --git a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.h b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.h index ec1016dd044..318edc425a7 100644 --- a/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.h +++ b/searchcore/src/vespa/searchcore/proton/docsummary/summarymanagerinitializer.h @@ -19,7 +19,7 @@ class SummaryManagerInitializer : public initializer::InitializerTask const vespalib::string _baseDir; const vespalib::string _subDbName; const DocTypeName _docTypeName; - vespalib::Executor &_summaryExecutor; + vespalib::Executor &_shared_executor; const search::LogDocumentStore::Config _storeCfg; const search::TuneFileSummary _tuneFile; const search::common::FileHeaderContext &_fileHeaderContext; @@ -35,7 +35,7 @@ public: const vespalib::string & baseDir, const vespalib::string &subDbName, const DocTypeName &docTypeName, - vespalib::Executor & summaryExecutor, + vespalib::Executor &shared_executor, const search::LogDocumentStore::Config & storeCfg, const search::TuneFileSummary &tuneFile, const search::common::FileHeaderContext & fileHeaderContext, diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index cdc6df1fdf2..4d992866590 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -15,9 +15,10 @@ #include <vespa/searchcore/proton/feedoperation/operations.h> #include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> #include <vespa/searchcore/proton/reference/i_pending_gid_to_lid_changes.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/destructor_callbacks.h> -#include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/log/log.h> LOG_SETUP(".proton.server.storeonlyfeedview"); @@ -25,18 +26,19 @@ LOG_SETUP(".proton.server.storeonlyfeedview"); using document::BucketId; using document::Document; using document::DocumentId; -using document::GlobalId; using document::DocumentTypeRepo; using document::DocumentUpdate; -using vespalib::IDestructorCallback; +using document::GlobalId; +using proton::documentmetastore::LidReuseDelayer; using search::SerialNum; using search::index::Schema; using storage::spi::BucketInfoResult; using storage::spi::Timestamp; +using vespalib::CpuUsage; +using vespalib::IDestructorCallback; using vespalib::IllegalStateException; using vespalib::makeLambdaTask; using vespalib::make_string; -using proton::documentmetastore::LidReuseDelayer; namespace proton { @@ -449,13 +451,14 @@ StoreOnlyFeedView::internalUpdate(FeedToken token, const UpdateOperation &updOp) } else { putSummaryNoop(std::move(futureStream), onWriteDone); } - _writeService.shared().execute(makeLambdaTask( - [upd = updOp.getUpdate(), useDocStore, lid, onWriteDone, promisedDoc = std::move(promisedDoc), - promisedStream = std::move(promisedStream), this]() mutable - { - makeUpdatedDocument(useDocStore, lid, *upd, onWriteDone, - std::move(promisedDoc), std::move(promisedStream)); - })); + auto task = makeLambdaTask([upd = updOp.getUpdate(), useDocStore, lid, onWriteDone, + promisedDoc = std::move(promisedDoc), + promisedStream = std::move(promisedStream), this]() mutable + { + makeUpdatedDocument(useDocStore, lid, *upd, onWriteDone, + std::move(promisedDoc), std::move(promisedStream)); + }); + _writeService.shared().execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::WRITE)); updateAttributes(serialNum, lid, std::move(futureDoc), onWriteDone); } } diff --git a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp index 280800f1b58..c924937d343 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/field_mergers_state.cpp @@ -5,9 +5,12 @@ #include "field_merger_task.h" #include "fusion_output_index.h" #include <vespa/searchcommon/common/schema.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/executor.h> #include <cassert> +using vespalib::CpuUsage; + namespace search::diskindex { FieldMergersState::FieldMergersState(const FusionOutputIndex& fusion_out_index, vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token) @@ -66,7 +69,8 @@ FieldMergersState::wait_field_mergers_done() void FieldMergersState::schedule_task(FieldMerger& field_merger) { - auto rejected = _executor.execute(std::make_unique<FieldMergerTask>(field_merger, *this)); + auto task = std::make_unique<FieldMergerTask>(field_merger, *this); + auto rejected = _executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT)); assert(!rejected); } diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp index 7d8bd4bc799..41b4b3cb794 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.cpp @@ -71,9 +71,9 @@ Fusion::Fusion(const Schema& schema, const vespalib::string& dir, Fusion::~Fusion() = default; bool -Fusion::mergeFields(vespalib::Executor & executor, std::shared_ptr<IFlushToken> flush_token) +Fusion::mergeFields(vespalib::Executor& shared_executor, std::shared_ptr<IFlushToken> flush_token) { - FieldMergersState field_mergers_state(_fusion_out_index, executor, flush_token); + FieldMergersState field_mergers_state(_fusion_out_index, shared_executor, flush_token); const Schema &schema = getSchema(); for (SchemaUtil::IndexIterator iter(schema); iter.isValid(); ++iter) { auto& field_merger = field_mergers_state.alloc_field_merger(iter.getIndex()); @@ -103,7 +103,7 @@ Fusion::readSchemaFiles() } bool -Fusion::merge(vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token) +Fusion::merge(vespalib::Executor& shared_executor, std::shared_ptr<IFlushToken> flush_token) { FastOS_StatInfo statInfo; if (!FastOS_File::Stat(_fusion_out_index.get_path().c_str(), &statInfo)) { @@ -137,7 +137,7 @@ Fusion::merge(vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_t if (!readSchemaFiles()) { throw IllegalArgumentException("Cannot read schema files for source indexes"); } - return mergeFields(executor, flush_token); + return mergeFields(shared_executor, flush_token); } catch (const std::exception & e) { LOG(error, "%s", e.what()); return false; diff --git a/searchlib/src/vespa/searchlib/diskindex/fusion.h b/searchlib/src/vespa/searchlib/diskindex/fusion.h index 04edf77ea81..d78bad097df 100644 --- a/searchlib/src/vespa/searchlib/diskindex/fusion.h +++ b/searchlib/src/vespa/searchlib/diskindex/fusion.h @@ -25,7 +25,7 @@ class Fusion private: using Schema = index::Schema; - bool mergeFields(vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token); + bool mergeFields(vespalib::Executor& shared_executor, std::shared_ptr<IFlushToken> flush_token); bool readSchemaFiles(); bool checkSchemaCompat(); @@ -43,7 +43,7 @@ public: ~Fusion(); void set_dynamic_k_pos_index_format(bool dynamic_k_pos_index_format) { _fusion_out_index.set_dynamic_k_pos_index_format(dynamic_k_pos_index_format); } void set_force_small_merge_chunk(bool force_small_merge_chunk) { _fusion_out_index.set_force_small_merge_chunk(force_small_merge_chunk); } - bool merge(vespalib::Executor& executor, std::shared_ptr<IFlushToken> flush_token); + bool merge(vespalib::Executor& shared_executor, std::shared_ptr<IFlushToken> flush_token); }; } diff --git a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp index 113883a307f..6dd630a6426 100644 --- a/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp +++ b/searchlib/src/vespa/searchlib/tensor/dense_tensor_attribute.cpp @@ -11,6 +11,7 @@ #include <vespa/searchlib/attribute/load_utils.h> #include <vespa/searchlib/attribute/readerbase.h> #include <vespa/vespalib/data/slime/inserter.h> +#include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/memory_allocator.h> #include <vespa/vespalib/util/mmap_file_allocator_factory.h> @@ -21,6 +22,7 @@ LOG_SETUP(".searchlib.tensor.dense_tensor_attribute"); using search::attribute::LoadUtils; +using vespalib::CpuUsage; using vespalib::eval::Value; using vespalib::eval::ValueType; using vespalib::slime::ObjectInserter; @@ -324,7 +326,7 @@ DenseTensorAttribute::ThreadedLoader::load(uint32_t lid, vespalib::datastore::En // Then we can issue a new one ++_pending; - _shared_executor.execute(vespalib::makeLambdaTask([this, ref, lid]() { + auto task = vespalib::makeLambdaTask([this, ref, lid]() { auto prepared = _attr._index->prepare_add_document(lid, _attr._denseTensorStore.get_typed_cells(ref), _attr.getGenerationHandler().takeGuard()); std::unique_lock guard(_mutex); @@ -332,7 +334,8 @@ DenseTensorAttribute::ThreadedLoader::load(uint32_t lid, vespalib::datastore::En if (_queue.size() == 1) { _cond.notify_all(); } - })); + }); + _shared_executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::SETUP)); } class DenseTensorAttribute::ForegroundLoader : public Loader { public: |