From 82aa3ab6fca0d5c6f6ec4943cdc3155eb2cff463 Mon Sep 17 00:00:00 2001 From: Geir Storli Date: Mon, 31 Jan 2022 14:45:09 +0000 Subject: Tag all document store tasks executed on the shared executor with cpu category. --- .../tests/docstore/file_chunk/file_chunk_test.cpp | 10 ++-- .../src/vespa/searchlib/docstore/compacter.cpp | 3 +- searchlib/src/vespa/searchlib/docstore/compacter.h | 2 +- .../src/vespa/searchlib/docstore/filechunk.cpp | 9 ++- searchlib/src/vespa/searchlib/docstore/filechunk.h | 8 ++- .../src/vespa/searchlib/docstore/logdatastore.cpp | 70 +++++++++++++--------- .../src/vespa/searchlib/docstore/logdatastore.h | 13 ++-- .../src/vespa/searchlib/docstore/storebybucket.cpp | 16 +++-- .../searchlib/docstore/writeablefilechunk.cpp | 43 +++++++------ .../vespa/searchlib/docstore/writeablefilechunk.h | 13 ++-- 10 files changed, 112 insertions(+), 75 deletions(-) (limited to 'searchlib') diff --git a/searchlib/src/tests/docstore/file_chunk/file_chunk_test.cpp b/searchlib/src/tests/docstore/file_chunk/file_chunk_test.cpp index 3b9f36d9f1f..b295291d7c4 100644 --- a/searchlib/src/tests/docstore/file_chunk/file_chunk_test.cpp +++ b/searchlib/src/tests/docstore/file_chunk/file_chunk_test.cpp @@ -1,23 +1,25 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include #include #include #include #include #include +#include +#include +#include #include #include #include #include -#include LOG_SETUP("file_chunk_test"); using namespace search; using common::FileHeaderContext; +using vespalib::CpuUsage; using vespalib::ThreadStackExecutor; struct MyFileHeaderContext : public FileHeaderContext { @@ -136,12 +138,12 @@ struct WriteFixture : public FixtureBase { dir.cleanup(dirCleanup); } void flush() { - chunk.flush(true, serialNum); + chunk.flush(true, serialNum, CpuUsage::Category::WRITE); chunk.flushPendingChunks(serialNum); } WriteFixture &append(uint32_t lid) { vespalib::string data = getData(lid); - chunk.append(nextSerialNum(), lid, data.c_str(), data.size()); + chunk.append(nextSerialNum(), lid, data.c_str(), data.size(), CpuUsage::Category::WRITE); return *this; } void updateLidMap(uint32_t docIdLimit) { diff --git a/searchlib/src/vespa/searchlib/docstore/compacter.cpp b/searchlib/src/vespa/searchlib/docstore/compacter.cpp index 26fb79f8a4e..2fd04489c45 100644 --- a/searchlib/src/vespa/searchlib/docstore/compacter.cpp +++ b/searchlib/src/vespa/searchlib/docstore/compacter.cpp @@ -18,7 +18,8 @@ Compacter::write(LockGuard guard, uint32_t chunkId, uint32_t lid, const void *bu _ds.write(std::move(guard), fileId, lid, buffer, sz); } -BucketCompacter::BucketCompacter(size_t maxSignificantBucketBits, const CompressionConfig & compression, LogDataStore & ds, Executor & executor, const IBucketizer & bucketizer, FileId source, FileId destination) : +BucketCompacter::BucketCompacter(size_t maxSignificantBucketBits, const CompressionConfig & compression, LogDataStore & ds, + Executor & executor, const IBucketizer & bucketizer, FileId source, FileId destination) : _unSignificantBucketBits((maxSignificantBucketBits > 8) ? (maxSignificantBucketBits - 8) : 0), _sourceFileId(source), _destinationFileId(destination), diff --git a/searchlib/src/vespa/searchlib/docstore/compacter.h b/searchlib/src/vespa/searchlib/docstore/compacter.h index cd4609ce33e..9c5775c0c4a 100644 --- a/searchlib/src/vespa/searchlib/docstore/compacter.h +++ b/searchlib/src/vespa/searchlib/docstore/compacter.h @@ -36,7 +36,7 @@ class BucketCompacter : public IWriteData, public StoreByBucket::IWrite public: using FileId = FileChunk::FileId; BucketCompacter(size_t maxSignificantBucketBits, const CompressionConfig & compression, LogDataStore & ds, - Executor & exeutor, const IBucketizer & bucketizer, FileId source, FileId destination); + Executor & executor, const IBucketizer & bucketizer, FileId source, FileId destination); void write(LockGuard guard, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz) override ; void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz) override; void close() override; diff --git a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp index 278fe166272..a56360d0b53 100644 --- a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp +++ b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp @@ -21,6 +21,7 @@ #include LOG_SETUP(".search.filechunk"); +using vespalib::CpuUsage; using vespalib::GenericHeader; using vespalib::getErrorString; @@ -336,7 +337,8 @@ appendChunks(FixedParams * args, Chunk::UP chunk) void FileChunk::appendTo(vespalib::Executor & executor, const IGetLid & db, IWriteData & dest, - uint32_t numChunks, IFileChunkVisitorProgress *visitorProgress) + uint32_t numChunks, IFileChunkVisitorProgress *visitorProgress, + vespalib::CpuUsage::Category cpu_category) { assert(frozen() || visitorProgress); vespalib::GenerationHandler::Guard lidReadGuard(db.getLidReadGuard()); @@ -347,12 +349,13 @@ FileChunk::appendTo(vespalib::Executor & executor, const IGetLid & db, IWriteDat for (size_t chunkId(0); chunkId < numChunks; chunkId++) { std::promise promisedChunk; std::future futureChunk = promisedChunk.get_future(); - executor.execute(vespalib::makeLambdaTask([promise = std::move(promisedChunk), chunkId, this]() mutable { + auto task = vespalib::makeLambdaTask([promise = std::move(promisedChunk), chunkId, this]() mutable { const ChunkInfo & cInfo(_chunkInfo[chunkId]); vespalib::DataBuffer whole(0ul, ALIGNMENT); FileRandRead::FSP keepAlive(_file->read(cInfo.getOffset(), whole, cInfo.getSize())); promise.set_value(std::make_unique(chunkId, whole.getData(), whole.getDataLen())); - })); + }); + executor.execute(CpuUsage::wrap(std::move(task), cpu_category)); while (queue.size() >= limit) { appendChunks(&fixedParams, queue.front().get()); diff --git a/searchlib/src/vespa/searchlib/docstore/filechunk.h b/searchlib/src/vespa/searchlib/docstore/filechunk.h index 0d669d9cfde..60c788cf9c7 100644 --- a/searchlib/src/vespa/searchlib/docstore/filechunk.h +++ b/searchlib/src/vespa/searchlib/docstore/filechunk.h @@ -7,10 +7,11 @@ #include "lid_info.h" #include "randread.h" #include -#include -#include #include +#include #include +#include +#include #include class FastOS_FileInterface; @@ -164,7 +165,8 @@ public: const vespalib::string & getName() const { return _name; } void compact(const IGetLid & iGetLid); void appendTo(vespalib::Executor & executor, const IGetLid & db, IWriteData & dest, - uint32_t numChunks, IFileChunkVisitorProgress *visitorProgress); + uint32_t numChunks, IFileChunkVisitorProgress *visitorProgress, + vespalib::CpuUsage::Category cpu_category); /** * Must be called after chunk has been created to allow correct * underlying file object to be created. Must be called before diff --git a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp index f2c4e12488a..2d62e1db6fd 100644 --- a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp +++ b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp @@ -3,10 +3,11 @@ #include "storebybucket.h" #include "compacter.h" #include "logdatastore.h" -#include -#include #include +#include #include +#include +#include #include #include #include @@ -22,17 +23,20 @@ namespace { constexpr uint32_t DEFAULT_MAX_LIDS_PER_FILE = 32_Mi; } -using vespalib::getLastErrorString; -using vespalib::getErrorString; -using vespalib::GenerationHandler; -using vespalib::make_string; -using vespalib::IllegalStateException; using common::FileHeaderContext; -using std::runtime_error; -using document::BucketId; -using docstore::StoreByBucket; using docstore::BucketCompacter; +using docstore::StoreByBucket; +using document::BucketId; using namespace std::literals; +using std::runtime_error; +using vespalib::CpuUsage; +using vespalib::GenerationHandler; +using vespalib::IllegalStateException; +using vespalib::getErrorString; +using vespalib::getLastErrorString; +using vespalib::make_string; + +using CpuCategory = CpuUsage::Category; LogDataStore::Config::Config() : _maxFileSize(DEFAULT_MAX_FILESIZE), @@ -180,29 +184,30 @@ LogDataStore::write(uint64_t serialNum, uint32_t lid, const void * buffer, size_ { std::unique_lock guard(_updateLock); WriteableFileChunk & active = getActive(guard); - write(std::move(guard), active, serialNum, lid, buffer, len); + write(std::move(guard), active, serialNum, lid, buffer, len, CpuCategory::WRITE); } void LogDataStore::write(MonitorGuard guard, FileId destinationFileId, uint32_t lid, const void * buffer, size_t len) { auto & destination = static_cast(*_fileChunks[destinationFileId.getId()]); - write(std::move(guard), destination, destination.getSerialNum(), lid, buffer, len); + write(std::move(guard), destination, destination.getSerialNum(), lid, buffer, len, CpuCategory::COMPACT); } void LogDataStore::write(MonitorGuard guard, WriteableFileChunk & destination, - uint64_t serialNum, uint32_t lid, const void * buffer, size_t len) + uint64_t serialNum, uint32_t lid, const void * buffer, size_t len, + CpuUsage::Category cpu_category) { - LidInfo lm = destination.append(serialNum, lid, buffer, len); + LidInfo lm = destination.append(serialNum, lid, buffer, len, cpu_category); setLid(guard, lid, lm); if (destination.getFileId() == getActiveFileId(guard)) { - requireSpace(std::move(guard), destination); + requireSpace(std::move(guard), destination, cpu_category); } } void -LogDataStore::requireSpace(MonitorGuard guard, WriteableFileChunk & active) +LogDataStore::requireSpace(MonitorGuard guard, WriteableFileChunk & active, CpuUsage::Category cpu_category) { assert(active.getFileId() == getActiveFileId(guard)); size_t oldSz(active.getDiskFootprint()); @@ -216,13 +221,13 @@ LogDataStore::requireSpace(MonitorGuard guard, WriteableFileChunk & active) guard.unlock(); // Write chunks to old .dat file // Note: Feed latency spike - active.flush(true, active.getSerialNum()); + active.flush(true, active.getSerialNum(), cpu_category); // Sync transaction log _tlSyncer.sync(active.getSerialNum()); // sync old active .dat file, write pending chunks to old .idx file // and sync old .idx file to disk. active.flushPendingChunks(active.getSerialNum()); - active.freeze(); + active.freeze(cpu_category); // TODO: Delay create of new file LOG(debug, "Closed file %s of size %ld and %u lids due to maxsize of %ld or maxlids %u reached. Bloat is %ld", active.getName().c_str(), active.getDiskFootprint(), active.getNumLids(), @@ -278,7 +283,7 @@ LogDataStore::remove(uint64_t serialNum, uint32_t lid) if (lm.valid()) { _fileChunks[lm.getFileId()]->remove(lid, lm.size()); } - lm = getActive(guard).append(serialNum, lid, nullptr, 0); + lm = getActive(guard).append(serialNum, lid, nullptr, 0, CpuCategory::WRITE); assert( lm.empty() ); _lidInfo[lid] = lm; } @@ -311,7 +316,9 @@ LogDataStore::flush(uint64_t syncToken) { MonitorGuard guard(_updateLock); // Note: Feed latency spike - getActive(guard).flush(true, syncToken); + // This is executed by an IFlushTarget, + // but is a fundamental part of the WRITE pipeline of the data store. + getActive(guard).flush(true, syncToken, CpuCategory::WRITE); active = &getActive(guard); activeHolder = holdFileChunk(active->getFileId()); } @@ -388,18 +395,21 @@ LogDataStore::compactWorst(uint64_t syncToken, bool compactDiskBloat) { } } -SerialNum LogDataStore::flushFile(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken) { +SerialNum LogDataStore::flushFile(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken, + CpuUsage::Category cpu_category) +{ (void) guard; uint64_t lastSerial(file.getSerialNum()); if (lastSerial > syncToken) { syncToken = lastSerial; } - file.flush(false, syncToken); + file.flush(false, syncToken, cpu_category); return syncToken; } void LogDataStore::flushFileAndWait(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken) { - syncToken = flushFile(std::move(guard), file, syncToken); + // This function is always called in the context of compaction. + syncToken = flushFile(std::move(guard), file, syncToken, CpuCategory::COMPACT); file.waitForDiskToCatchUpToNow(); _tlSyncer.sync(syncToken); file.flushPendingChunks(syncToken); @@ -408,7 +418,9 @@ void LogDataStore::flushFileAndWait(MonitorGuard guard, WriteableFileChunk & fil SerialNum LogDataStore::flushActive(SerialNum syncToken) { MonitorGuard guard(_updateLock); WriteableFileChunk &active = getActive(guard); - return flushFile(std::move(guard), active, syncToken); + // This is executed by an IFlushTarget (via initFlush), + // but is a fundamental part of the WRITE pipeline of the data store. + return flushFile(std::move(guard), active, syncToken, CpuCategory::WRITE); } void LogDataStore::flushActiveAndWait(SerialNum syncToken) { @@ -450,7 +462,7 @@ void LogDataStore::compactFile(FileId fileId) compacter = std::make_unique(*this); } - fc->appendTo(_executor, *this, *compacter, fc->getNumChunks(), nullptr); + fc->appendTo(_executor, *this, *compacter, fc->getNumChunks(), nullptr, CpuCategory::COMPACT); if (destinationFileId.isActive()) { flushActiveAndWait(0); @@ -458,7 +470,7 @@ void LogDataStore::compactFile(FileId fileId) MonitorGuard guard(_updateLock); auto & compactTo = dynamic_cast(*_fileChunks[destinationFileId.getId()]); flushFileAndWait(std::move(guard), compactTo, 0); - compactTo.freeze(); + compactTo.freeze(CpuCategory::COMPACT); } compacter.reset(); @@ -1071,7 +1083,9 @@ LogDataStore::accept(IDataStoreVisitor &visitor, WrapVisitorProgress wrapProgress(visitorProgress, totalChunks); for (FileId fcId : fileChunks) { FileChunk & fc = *_fileChunks[fcId.getId()]; - fc.appendTo(_executor, *this, wrap, fc.getNumChunks(), &wrapProgress); + // accept() is used when reprocessing all documents stored (e.g. when adding attribute to a field). + // We tag this work as WRITE, as the alternative to reprocessing would be to re-feed the data. + fc.appendTo(_executor, *this, wrap, fc.getNumChunks(), &wrapProgress, CpuCategory::WRITE); if (prune) { internalFlushAll(); FileChunk::UP toDie; @@ -1082,7 +1096,7 @@ LogDataStore::accept(IDataStoreVisitor &visitor, toDie->erase(); } } - lfc.appendTo(_executor, *this, wrap, lastChunks, &wrapProgress); + lfc.appendTo(_executor, *this, wrap, lastChunks, &wrapProgress, CpuCategory::WRITE); if (prune) { internalFlushAll(); } diff --git a/searchlib/src/vespa/searchlib/docstore/logdatastore.h b/searchlib/src/vespa/searchlib/docstore/logdatastore.h index 14d7b856e96..fc985c57ce4 100644 --- a/searchlib/src/vespa/searchlib/docstore/logdatastore.h +++ b/searchlib/src/vespa/searchlib/docstore/logdatastore.h @@ -5,12 +5,13 @@ #include "idatastore.h" #include "lid_info.h" #include "writeablefilechunk.h" -#include #include #include #include -#include +#include +#include #include +#include #include @@ -114,7 +115,8 @@ public: const Config & getConfig() const { return _config; } Config & getConfig() { return _config; } - void write(MonitorGuard guard, WriteableFileChunk & destination, uint64_t serialNum, uint32_t lid, const void * buffer, size_t len); + void write(MonitorGuard guard, WriteableFileChunk & destination, uint64_t serialNum, uint32_t lid, + const void * buffer, size_t len, vespalib::CpuUsage::Category cpu_category); void write(MonitorGuard guard, FileId destinationFileId, uint32_t lid, const void * buffer, size_t len); /** @@ -215,7 +217,7 @@ private: vespalib::string createDatFileName(NameId id) const; vespalib::string createIdxFileName(NameId id) const; - void requireSpace(MonitorGuard guard, WriteableFileChunk & active); + void requireSpace(MonitorGuard guard, WriteableFileChunk & active, vespalib::CpuUsage::Category cpu_category); bool isReadOnly() const { return _readOnly; } void updateSerialNum(); @@ -232,7 +234,8 @@ private: */ void unholdFileChunk(FileId fileId); - SerialNum flushFile(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken); + SerialNum flushFile(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken, + vespalib::CpuUsage::Category cpu_category); SerialNum flushActive(SerialNum syncToken); void flushActiveAndWait(SerialNum syncToken); void flushFileAndWait(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken); diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp index 37732fe46cb..651ff111f4e 100644 --- a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp +++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp @@ -1,14 +1,16 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "storebybucket.h" -#include -#include #include +#include +#include +#include #include namespace search::docstore { using document::BucketId; +using vespalib::CpuUsage; using vespalib::makeLambdaTask; StoreByBucket::StoreByBucket(MemoryDataStore & backingMemory, Executor & executor, const CompressionConfig & compression) noexcept @@ -35,9 +37,10 @@ StoreByBucket::add(BucketId bucketId, uint32_t chunkId, uint32_t lid, const void Chunk::UP tmpChunk = createChunk(); _current.swap(tmpChunk); incChunksPosted(); - _executor.execute(makeLambdaTask([this, chunk=std::move(tmpChunk)]() mutable { + auto task = makeLambdaTask([this, chunk=std::move(tmpChunk)]() mutable { closeChunk(std::move(chunk)); - })); + }); + _executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT)); } Index idx(bucketId, _current->getId(), chunkId, lid); _current->append(lid, buffer, sz); @@ -88,9 +91,10 @@ void StoreByBucket::drain(IWrite & drainer) { incChunksPosted(); - _executor.execute(makeLambdaTask([this, chunk=std::move(_current)]() mutable { + auto task = makeLambdaTask([this, chunk=std::move(_current)]() mutable { closeChunk(std::move(chunk)); - })); + }); + _executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT)); waitAllProcessed(); std::vector chunks; chunks.resize(_chunks.size()); diff --git a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp index 82ae0c02e18..78b247ffd46 100644 --- a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp +++ b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp @@ -10,19 +10,21 @@ #include #include #include +#include #include #include #include LOG_SETUP(".search.writeablefilechunk"); -using vespalib::makeLambdaTask; +using search::common::FileHeaderContext; +using vespalib::CpuUsage; using vespalib::FileHeader; +using vespalib::GenerationHandler; +using vespalib::IllegalHeaderException; +using vespalib::makeLambdaTask; using vespalib::make_string; using vespalib::nbostream; -using vespalib::IllegalHeaderException; -using vespalib::GenerationHandler; -using search::common::FileHeaderContext; namespace search { @@ -161,9 +163,9 @@ WriteableFileChunk::~WriteableFileChunk() { if (!frozen()) { if (_active->size() || _active->count()) { - flush(true, _serialNum); + flush(true, _serialNum, CpuUsage::Category::WRITE); } - freeze(); + freeze(CpuUsage::Category::WRITE); } // This is a wild stab at fixing bug 6348143. // If it works it indicates something bad with the filesystem. @@ -188,9 +190,10 @@ WriteableFileChunk::updateLidMap(const unique_lock &guard, ISetLid &ds, uint64_t } void -WriteableFileChunk::restart(uint32_t nextChunkId) +WriteableFileChunk::restart(uint32_t nextChunkId, CpuUsage::Category cpu_category) { - _executor.execute(makeLambdaTask([this, nextChunkId] {fileWriter(nextChunkId);})); + auto task = makeLambdaTask([this, nextChunkId] {fileWriter(nextChunkId);}); + _executor.execute(CpuUsage::wrap(std::move(task), cpu_category)); } namespace { @@ -282,7 +285,7 @@ WriteableFileChunk::read(uint32_t lid, SubChunkId chunkId, vespalib::DataBuffer } void -WriteableFileChunk::internalFlush(uint32_t chunkId, uint64_t serialNum) +WriteableFileChunk::internalFlush(uint32_t chunkId, uint64_t serialNum, CpuUsage::Category cpu_category) { Chunk * active(nullptr); { @@ -305,11 +308,11 @@ WriteableFileChunk::internalFlush(uint32_t chunkId, uint64_t serialNum) std::lock_guard innerGuard(_lock); setDiskFootprint(FileChunk::getDiskFootprint() + tmp->getBuf().getDataLen()); } - enque(std::move(tmp)); + enque(std::move(tmp), cpu_category); } void -WriteableFileChunk::enque(ProcessedChunkUP tmp) +WriteableFileChunk::enque(ProcessedChunkUP tmp, CpuUsage::Category cpu_category) { LOG(debug, "enqueing %p", tmp.get()); std::unique_lock guard(_writeMonitor); @@ -319,7 +322,7 @@ WriteableFileChunk::enque(ProcessedChunkUP tmp) uint32_t nextChunkId = _firstChunkIdToBeWritten; guard.unlock(); _writeCond.notify_one(); - restart(nextChunkId); + restart(nextChunkId, cpu_category); } else { _writeCond.notify_one(); } @@ -563,11 +566,11 @@ WriteableFileChunk::getModificationTime() const } void -WriteableFileChunk::freeze() +WriteableFileChunk::freeze(CpuUsage::Category cpu_category) { if (!frozen()) { waitForAllChunksFlushedToDisk(); - enque(ProcessedChunkUP()); + enque(ProcessedChunkUP(), cpu_category); { std::unique_lock guard(_writeMonitor); while (_writeTaskIsRunning) { @@ -665,12 +668,15 @@ int32_t WriteableFileChunk::flushLastIfNonEmpty(bool force) } void -WriteableFileChunk::flush(bool block, uint64_t syncToken) +WriteableFileChunk::flush(bool block, uint64_t syncToken, CpuUsage::Category cpu_category) { int32_t chunkId = flushLastIfNonEmpty(syncToken > _serialNum); if (chunkId >= 0) { setSerialNum(syncToken); - _executor.execute(makeLambdaTask([this, chunkId, serialNum=_serialNum] { internalFlush(chunkId, serialNum); })); + auto task = makeLambdaTask([this, chunkId, serialNum=_serialNum, cpu_category] { + internalFlush(chunkId, serialNum, cpu_category); + }); + _executor.execute(CpuUsage::wrap(std::move(task), cpu_category)); } else { if (block) { std::lock_guard guard(_lock); @@ -716,11 +722,12 @@ WriteableFileChunk::waitForAllChunksFlushedToDisk() const } LidInfo -WriteableFileChunk::append(uint64_t serialNum, uint32_t lid, const void * buffer, size_t len) +WriteableFileChunk::append(uint64_t serialNum, uint32_t lid, const void * buffer, size_t len, + CpuUsage::Category cpu_category) { assert( !frozen() ); if ( ! _active->hasRoom(len)) { - flush(false, _serialNum); + flush(false, _serialNum, cpu_category); } assert(serialNum >= _serialNum); _serialNum = serialNum; diff --git a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h index f1b69a5f1f9..846a9a9035e 100644 --- a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h +++ b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h @@ -53,13 +53,14 @@ public: ssize_t read(uint32_t lid, SubChunkId chunk, vespalib::DataBuffer & buffer) const override; void read(LidInfoWithLidV::const_iterator begin, size_t count, IBufferVisitor & visitor) const override; - LidInfo append(uint64_t serialNum, uint32_t lid, const void * buffer, size_t len); - void flush(bool block, uint64_t syncToken); + LidInfo append(uint64_t serialNum, uint32_t lid, const void * buffer, size_t len, + vespalib::CpuUsage::Category cpu_category); + void flush(bool block, uint64_t syncToken, vespalib::CpuUsage::Category cpu_category); uint64_t getSerialNum() const { return _serialNum; } void setSerialNum(uint64_t serialNum) { _serialNum = std::max(_serialNum, serialNum); } vespalib::system_time getModificationTime() const override; - void freeze(); + void freeze(vespalib::CpuUsage::Category cpu_category); size_t getDiskFootprint() const override; size_t getMemoryFootprint() const override; size_t getMemoryMetaFootprint() const override; @@ -80,11 +81,11 @@ private: void waitForChunkFlushedToDisk(uint32_t chunkId) const; void waitForAllChunksFlushedToDisk() const; void fileWriter(const uint32_t firstChunkId); - void internalFlush(uint32_t, uint64_t serialNum); - void enque(ProcessedChunkUP); + void internalFlush(uint32_t, uint64_t serialNum, vespalib::CpuUsage::Category cpu_category); + void enque(ProcessedChunkUP, vespalib::CpuUsage::Category cpu_category); int32_t flushLastIfNonEmpty(bool force); // _writeMonitor should not be held when calling restart - void restart(uint32_t nextChunkId); + void restart(uint32_t nextChunkId, vespalib::CpuUsage::Category cpu_category); ProcessedChunkQ drainQ(unique_lock & guard); void readDataHeader(); void readIdxHeader(FastOS_FileInterface & idxFile); -- cgit v1.2.3