summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-01-31 16:28:07 +0100
committerGitHub <noreply@github.com>2022-01-31 16:28:07 +0100
commit237f5a4edfdaa8d8ac7a4bdef9ea632ec5583cfe (patch)
tree5a15eff1ea63636a72921496467ca7f2e39516bf
parent50b2c0c0e99f47a36da61300c0e4ce935b8cb075 (diff)
parent82aa3ab6fca0d5c6f6ec4943cdc3155eb2cff463 (diff)
Merge pull request #20999 from vespa-engine/geirst/cpu-category-tag-document-store-tasks
Tag all document store tasks executed on the shared executor with cpu…
-rw-r--r--searchlib/src/tests/docstore/file_chunk/file_chunk_test.cpp10
-rw-r--r--searchlib/src/vespa/searchlib/docstore/compacter.cpp3
-rw-r--r--searchlib/src/vespa/searchlib/docstore/compacter.h2
-rw-r--r--searchlib/src/vespa/searchlib/docstore/filechunk.cpp9
-rw-r--r--searchlib/src/vespa/searchlib/docstore/filechunk.h8
-rw-r--r--searchlib/src/vespa/searchlib/docstore/logdatastore.cpp70
-rw-r--r--searchlib/src/vespa/searchlib/docstore/logdatastore.h13
-rw-r--r--searchlib/src/vespa/searchlib/docstore/storebybucket.cpp16
-rw-r--r--searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp43
-rw-r--r--searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h13
-rw-r--r--vespalib/src/vespa/vespalib/util/spin_lock.h2
11 files changed, 114 insertions, 75 deletions
diff --git a/searchlib/src/tests/docstore/file_chunk/file_chunk_test.cpp b/searchlib/src/tests/docstore/file_chunk/file_chunk_test.cpp
index 3b9f36d9f1f..b295291d7c4 100644
--- a/searchlib/src/tests/docstore/file_chunk/file_chunk_test.cpp
+++ b/searchlib/src/tests/docstore/file_chunk/file_chunk_test.cpp
@@ -1,23 +1,25 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/searchlib/common/fileheadercontext.h>
#include <vespa/searchlib/docstore/filechunk.h>
#include <vespa/searchlib/docstore/writeablefilechunk.h>
#include <vespa/searchlib/test/directory_handler.h>
#include <vespa/vespalib/test/insertion_operators.h>
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/vespalib/util/cpu_usage.h>
+#include <vespa/vespalib/util/compressionconfig.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <iomanip>
#include <iostream>
#include <vespa/log/log.h>
-#include <vespa/vespalib/util/compressionconfig.h>
LOG_SETUP("file_chunk_test");
using namespace search;
using common::FileHeaderContext;
+using vespalib::CpuUsage;
using vespalib::ThreadStackExecutor;
struct MyFileHeaderContext : public FileHeaderContext {
@@ -136,12 +138,12 @@ struct WriteFixture : public FixtureBase {
dir.cleanup(dirCleanup);
}
void flush() {
- chunk.flush(true, serialNum);
+ chunk.flush(true, serialNum, CpuUsage::Category::WRITE);
chunk.flushPendingChunks(serialNum);
}
WriteFixture &append(uint32_t lid) {
vespalib::string data = getData(lid);
- chunk.append(nextSerialNum(), lid, data.c_str(), data.size());
+ chunk.append(nextSerialNum(), lid, data.c_str(), data.size(), CpuUsage::Category::WRITE);
return *this;
}
void updateLidMap(uint32_t docIdLimit) {
diff --git a/searchlib/src/vespa/searchlib/docstore/compacter.cpp b/searchlib/src/vespa/searchlib/docstore/compacter.cpp
index 3639b0a57d2..33ed84ff4d0 100644
--- a/searchlib/src/vespa/searchlib/docstore/compacter.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/compacter.cpp
@@ -23,7 +23,8 @@ Compacter::write(LockGuard guard, uint32_t chunkId, uint32_t lid, const void *bu
_ds.write(std::move(guard), fileId, lid, buffer, sz);
}
-BucketCompacter::BucketCompacter(size_t maxSignificantBucketBits, const CompressionConfig & compression, LogDataStore & ds, Executor & executor, const IBucketizer & bucketizer, FileId source, FileId destination) :
+BucketCompacter::BucketCompacter(size_t maxSignificantBucketBits, const CompressionConfig & compression, LogDataStore & ds,
+ Executor & executor, const IBucketizer & bucketizer, FileId source, FileId destination) :
_unSignificantBucketBits((maxSignificantBucketBits > 8) ? (maxSignificantBucketBits - 8) : 0),
_sourceFileId(source),
_destinationFileId(destination),
diff --git a/searchlib/src/vespa/searchlib/docstore/compacter.h b/searchlib/src/vespa/searchlib/docstore/compacter.h
index cd4609ce33e..9c5775c0c4a 100644
--- a/searchlib/src/vespa/searchlib/docstore/compacter.h
+++ b/searchlib/src/vespa/searchlib/docstore/compacter.h
@@ -36,7 +36,7 @@ class BucketCompacter : public IWriteData, public StoreByBucket::IWrite
public:
using FileId = FileChunk::FileId;
BucketCompacter(size_t maxSignificantBucketBits, const CompressionConfig & compression, LogDataStore & ds,
- Executor & exeutor, const IBucketizer & bucketizer, FileId source, FileId destination);
+ Executor & executor, const IBucketizer & bucketizer, FileId source, FileId destination);
void write(LockGuard guard, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz) override ;
void write(BucketId bucketId, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz) override;
void close() override;
diff --git a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp
index 278fe166272..a56360d0b53 100644
--- a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp
@@ -21,6 +21,7 @@
#include <vespa/log/log.h>
LOG_SETUP(".search.filechunk");
+using vespalib::CpuUsage;
using vespalib::GenericHeader;
using vespalib::getErrorString;
@@ -336,7 +337,8 @@ appendChunks(FixedParams * args, Chunk::UP chunk)
void
FileChunk::appendTo(vespalib::Executor & executor, const IGetLid & db, IWriteData & dest,
- uint32_t numChunks, IFileChunkVisitorProgress *visitorProgress)
+ uint32_t numChunks, IFileChunkVisitorProgress *visitorProgress,
+ vespalib::CpuUsage::Category cpu_category)
{
assert(frozen() || visitorProgress);
vespalib::GenerationHandler::Guard lidReadGuard(db.getLidReadGuard());
@@ -347,12 +349,13 @@ FileChunk::appendTo(vespalib::Executor & executor, const IGetLid & db, IWriteDat
for (size_t chunkId(0); chunkId < numChunks; chunkId++) {
std::promise<Chunk::UP> promisedChunk;
std::future<Chunk::UP> futureChunk = promisedChunk.get_future();
- executor.execute(vespalib::makeLambdaTask([promise = std::move(promisedChunk), chunkId, this]() mutable {
+ auto task = vespalib::makeLambdaTask([promise = std::move(promisedChunk), chunkId, this]() mutable {
const ChunkInfo & cInfo(_chunkInfo[chunkId]);
vespalib::DataBuffer whole(0ul, ALIGNMENT);
FileRandRead::FSP keepAlive(_file->read(cInfo.getOffset(), whole, cInfo.getSize()));
promise.set_value(std::make_unique<Chunk>(chunkId, whole.getData(), whole.getDataLen()));
- }));
+ });
+ executor.execute(CpuUsage::wrap(std::move(task), cpu_category));
while (queue.size() >= limit) {
appendChunks(&fixedParams, queue.front().get());
diff --git a/searchlib/src/vespa/searchlib/docstore/filechunk.h b/searchlib/src/vespa/searchlib/docstore/filechunk.h
index 0d669d9cfde..60c788cf9c7 100644
--- a/searchlib/src/vespa/searchlib/docstore/filechunk.h
+++ b/searchlib/src/vespa/searchlib/docstore/filechunk.h
@@ -7,10 +7,11 @@
#include "lid_info.h"
#include "randread.h"
#include <vespa/searchlib/common/tunefileinfo.h>
-#include <vespa/vespalib/util/memoryusage.h>
-#include <vespa/vespalib/util/ptrholder.h>
#include <vespa/vespalib/stllike/hash_map.h>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/generationhandler.h>
+#include <vespa/vespalib/util/memoryusage.h>
+#include <vespa/vespalib/util/ptrholder.h>
#include <vespa/vespalib/util/time.h>
class FastOS_FileInterface;
@@ -164,7 +165,8 @@ public:
const vespalib::string & getName() const { return _name; }
void compact(const IGetLid & iGetLid);
void appendTo(vespalib::Executor & executor, const IGetLid & db, IWriteData & dest,
- uint32_t numChunks, IFileChunkVisitorProgress *visitorProgress);
+ uint32_t numChunks, IFileChunkVisitorProgress *visitorProgress,
+ vespalib::CpuUsage::Category cpu_category);
/**
* Must be called after chunk has been created to allow correct
* underlying file object to be created. Must be called before
diff --git a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp
index f2c4e12488a..2d62e1db6fd 100644
--- a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp
@@ -3,10 +3,11 @@
#include "storebybucket.h"
#include "compacter.h"
#include "logdatastore.h"
-#include <vespa/vespalib/stllike/asciistream.h>
-#include <vespa/vespalib/util/benchmark_timer.h>
#include <vespa/vespalib/data/fileheader.h>
+#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/util/benchmark_timer.h>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/rcuvector.hpp>
#include <vespa/vespalib/util/size_literals.h>
@@ -22,17 +23,20 @@ namespace {
constexpr uint32_t DEFAULT_MAX_LIDS_PER_FILE = 32_Mi;
}
-using vespalib::getLastErrorString;
-using vespalib::getErrorString;
-using vespalib::GenerationHandler;
-using vespalib::make_string;
-using vespalib::IllegalStateException;
using common::FileHeaderContext;
-using std::runtime_error;
-using document::BucketId;
-using docstore::StoreByBucket;
using docstore::BucketCompacter;
+using docstore::StoreByBucket;
+using document::BucketId;
using namespace std::literals;
+using std::runtime_error;
+using vespalib::CpuUsage;
+using vespalib::GenerationHandler;
+using vespalib::IllegalStateException;
+using vespalib::getErrorString;
+using vespalib::getLastErrorString;
+using vespalib::make_string;
+
+using CpuCategory = CpuUsage::Category;
LogDataStore::Config::Config()
: _maxFileSize(DEFAULT_MAX_FILESIZE),
@@ -180,29 +184,30 @@ LogDataStore::write(uint64_t serialNum, uint32_t lid, const void * buffer, size_
{
std::unique_lock guard(_updateLock);
WriteableFileChunk & active = getActive(guard);
- write(std::move(guard), active, serialNum, lid, buffer, len);
+ write(std::move(guard), active, serialNum, lid, buffer, len, CpuCategory::WRITE);
}
void
LogDataStore::write(MonitorGuard guard, FileId destinationFileId, uint32_t lid, const void * buffer, size_t len)
{
auto & destination = static_cast<WriteableFileChunk &>(*_fileChunks[destinationFileId.getId()]);
- write(std::move(guard), destination, destination.getSerialNum(), lid, buffer, len);
+ write(std::move(guard), destination, destination.getSerialNum(), lid, buffer, len, CpuCategory::COMPACT);
}
void
LogDataStore::write(MonitorGuard guard, WriteableFileChunk & destination,
- uint64_t serialNum, uint32_t lid, const void * buffer, size_t len)
+ uint64_t serialNum, uint32_t lid, const void * buffer, size_t len,
+ CpuUsage::Category cpu_category)
{
- LidInfo lm = destination.append(serialNum, lid, buffer, len);
+ LidInfo lm = destination.append(serialNum, lid, buffer, len, cpu_category);
setLid(guard, lid, lm);
if (destination.getFileId() == getActiveFileId(guard)) {
- requireSpace(std::move(guard), destination);
+ requireSpace(std::move(guard), destination, cpu_category);
}
}
void
-LogDataStore::requireSpace(MonitorGuard guard, WriteableFileChunk & active)
+LogDataStore::requireSpace(MonitorGuard guard, WriteableFileChunk & active, CpuUsage::Category cpu_category)
{
assert(active.getFileId() == getActiveFileId(guard));
size_t oldSz(active.getDiskFootprint());
@@ -216,13 +221,13 @@ LogDataStore::requireSpace(MonitorGuard guard, WriteableFileChunk & active)
guard.unlock();
// Write chunks to old .dat file
// Note: Feed latency spike
- active.flush(true, active.getSerialNum());
+ active.flush(true, active.getSerialNum(), cpu_category);
// Sync transaction log
_tlSyncer.sync(active.getSerialNum());
// sync old active .dat file, write pending chunks to old .idx file
// and sync old .idx file to disk.
active.flushPendingChunks(active.getSerialNum());
- active.freeze();
+ active.freeze(cpu_category);
// TODO: Delay create of new file
LOG(debug, "Closed file %s of size %ld and %u lids due to maxsize of %ld or maxlids %u reached. Bloat is %ld",
active.getName().c_str(), active.getDiskFootprint(), active.getNumLids(),
@@ -278,7 +283,7 @@ LogDataStore::remove(uint64_t serialNum, uint32_t lid)
if (lm.valid()) {
_fileChunks[lm.getFileId()]->remove(lid, lm.size());
}
- lm = getActive(guard).append(serialNum, lid, nullptr, 0);
+ lm = getActive(guard).append(serialNum, lid, nullptr, 0, CpuCategory::WRITE);
assert( lm.empty() );
_lidInfo[lid] = lm;
}
@@ -311,7 +316,9 @@ LogDataStore::flush(uint64_t syncToken)
{
MonitorGuard guard(_updateLock);
// Note: Feed latency spike
- getActive(guard).flush(true, syncToken);
+ // This is executed by an IFlushTarget,
+ // but is a fundamental part of the WRITE pipeline of the data store.
+ getActive(guard).flush(true, syncToken, CpuCategory::WRITE);
active = &getActive(guard);
activeHolder = holdFileChunk(active->getFileId());
}
@@ -388,18 +395,21 @@ LogDataStore::compactWorst(uint64_t syncToken, bool compactDiskBloat) {
}
}
-SerialNum LogDataStore::flushFile(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken) {
+SerialNum LogDataStore::flushFile(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken,
+ CpuUsage::Category cpu_category)
+{
(void) guard;
uint64_t lastSerial(file.getSerialNum());
if (lastSerial > syncToken) {
syncToken = lastSerial;
}
- file.flush(false, syncToken);
+ file.flush(false, syncToken, cpu_category);
return syncToken;
}
void LogDataStore::flushFileAndWait(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken) {
- syncToken = flushFile(std::move(guard), file, syncToken);
+ // This function is always called in the context of compaction.
+ syncToken = flushFile(std::move(guard), file, syncToken, CpuCategory::COMPACT);
file.waitForDiskToCatchUpToNow();
_tlSyncer.sync(syncToken);
file.flushPendingChunks(syncToken);
@@ -408,7 +418,9 @@ void LogDataStore::flushFileAndWait(MonitorGuard guard, WriteableFileChunk & fil
SerialNum LogDataStore::flushActive(SerialNum syncToken) {
MonitorGuard guard(_updateLock);
WriteableFileChunk &active = getActive(guard);
- return flushFile(std::move(guard), active, syncToken);
+ // This is executed by an IFlushTarget (via initFlush),
+ // but is a fundamental part of the WRITE pipeline of the data store.
+ return flushFile(std::move(guard), active, syncToken, CpuCategory::WRITE);
}
void LogDataStore::flushActiveAndWait(SerialNum syncToken) {
@@ -450,7 +462,7 @@ void LogDataStore::compactFile(FileId fileId)
compacter = std::make_unique<docstore::Compacter>(*this);
}
- fc->appendTo(_executor, *this, *compacter, fc->getNumChunks(), nullptr);
+ fc->appendTo(_executor, *this, *compacter, fc->getNumChunks(), nullptr, CpuCategory::COMPACT);
if (destinationFileId.isActive()) {
flushActiveAndWait(0);
@@ -458,7 +470,7 @@ void LogDataStore::compactFile(FileId fileId)
MonitorGuard guard(_updateLock);
auto & compactTo = dynamic_cast<WriteableFileChunk &>(*_fileChunks[destinationFileId.getId()]);
flushFileAndWait(std::move(guard), compactTo, 0);
- compactTo.freeze();
+ compactTo.freeze(CpuCategory::COMPACT);
}
compacter.reset();
@@ -1071,7 +1083,9 @@ LogDataStore::accept(IDataStoreVisitor &visitor,
WrapVisitorProgress wrapProgress(visitorProgress, totalChunks);
for (FileId fcId : fileChunks) {
FileChunk & fc = *_fileChunks[fcId.getId()];
- fc.appendTo(_executor, *this, wrap, fc.getNumChunks(), &wrapProgress);
+ // accept() is used when reprocessing all documents stored (e.g. when adding attribute to a field).
+ // We tag this work as WRITE, as the alternative to reprocessing would be to re-feed the data.
+ fc.appendTo(_executor, *this, wrap, fc.getNumChunks(), &wrapProgress, CpuCategory::WRITE);
if (prune) {
internalFlushAll();
FileChunk::UP toDie;
@@ -1082,7 +1096,7 @@ LogDataStore::accept(IDataStoreVisitor &visitor,
toDie->erase();
}
}
- lfc.appendTo(_executor, *this, wrap, lastChunks, &wrapProgress);
+ lfc.appendTo(_executor, *this, wrap, lastChunks, &wrapProgress, CpuCategory::WRITE);
if (prune) {
internalFlushAll();
}
diff --git a/searchlib/src/vespa/searchlib/docstore/logdatastore.h b/searchlib/src/vespa/searchlib/docstore/logdatastore.h
index 14d7b856e96..fc985c57ce4 100644
--- a/searchlib/src/vespa/searchlib/docstore/logdatastore.h
+++ b/searchlib/src/vespa/searchlib/docstore/logdatastore.h
@@ -5,12 +5,13 @@
#include "idatastore.h"
#include "lid_info.h"
#include "writeablefilechunk.h"
-#include <vespa/vespalib/util/compressionconfig.h>
#include <vespa/searchcommon/common/growstrategy.h>
#include <vespa/searchlib/common/tunefileinfo.h>
#include <vespa/searchlib/transactionlog/syncproxy.h>
-#include <vespa/vespalib/util/rcuvector.h>
+#include <vespa/vespalib/util/compressionconfig.h>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/executor.h>
+#include <vespa/vespalib/util/rcuvector.h>
#include <set>
@@ -114,7 +115,8 @@ public:
const Config & getConfig() const { return _config; }
Config & getConfig() { return _config; }
- void write(MonitorGuard guard, WriteableFileChunk & destination, uint64_t serialNum, uint32_t lid, const void * buffer, size_t len);
+ void write(MonitorGuard guard, WriteableFileChunk & destination, uint64_t serialNum, uint32_t lid,
+ const void * buffer, size_t len, vespalib::CpuUsage::Category cpu_category);
void write(MonitorGuard guard, FileId destinationFileId, uint32_t lid, const void * buffer, size_t len);
/**
@@ -215,7 +217,7 @@ private:
vespalib::string createDatFileName(NameId id) const;
vespalib::string createIdxFileName(NameId id) const;
- void requireSpace(MonitorGuard guard, WriteableFileChunk & active);
+ void requireSpace(MonitorGuard guard, WriteableFileChunk & active, vespalib::CpuUsage::Category cpu_category);
bool isReadOnly() const { return _readOnly; }
void updateSerialNum();
@@ -232,7 +234,8 @@ private:
*/
void unholdFileChunk(FileId fileId);
- SerialNum flushFile(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken);
+ SerialNum flushFile(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken,
+ vespalib::CpuUsage::Category cpu_category);
SerialNum flushActive(SerialNum syncToken);
void flushActiveAndWait(SerialNum syncToken);
void flushFileAndWait(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken);
diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp
index 37732fe46cb..651ff111f4e 100644
--- a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp
@@ -1,14 +1,16 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "storebybucket.h"
-#include <vespa/vespalib/util/lambdatask.h>
-#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/vespalib/data/databuffer.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/util/cpu_usage.h>
+#include <vespa/vespalib/util/lambdatask.h>
#include <algorithm>
namespace search::docstore {
using document::BucketId;
+using vespalib::CpuUsage;
using vespalib::makeLambdaTask;
StoreByBucket::StoreByBucket(MemoryDataStore & backingMemory, Executor & executor, const CompressionConfig & compression) noexcept
@@ -35,9 +37,10 @@ StoreByBucket::add(BucketId bucketId, uint32_t chunkId, uint32_t lid, const void
Chunk::UP tmpChunk = createChunk();
_current.swap(tmpChunk);
incChunksPosted();
- _executor.execute(makeLambdaTask([this, chunk=std::move(tmpChunk)]() mutable {
+ auto task = makeLambdaTask([this, chunk=std::move(tmpChunk)]() mutable {
closeChunk(std::move(chunk));
- }));
+ });
+ _executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT));
}
Index idx(bucketId, _current->getId(), chunkId, lid);
_current->append(lid, buffer, sz);
@@ -88,9 +91,10 @@ void
StoreByBucket::drain(IWrite & drainer)
{
incChunksPosted();
- _executor.execute(makeLambdaTask([this, chunk=std::move(_current)]() mutable {
+ auto task = makeLambdaTask([this, chunk=std::move(_current)]() mutable {
closeChunk(std::move(chunk));
- }));
+ });
+ _executor.execute(CpuUsage::wrap(std::move(task), CpuUsage::Category::COMPACT));
waitAllProcessed();
std::vector<Chunk::UP> chunks;
chunks.resize(_chunks.size());
diff --git a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp
index 82ae0c02e18..78b247ffd46 100644
--- a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp
@@ -10,19 +10,21 @@
#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/vespalib/util/array.hpp>
+#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/log/log.h>
LOG_SETUP(".search.writeablefilechunk");
-using vespalib::makeLambdaTask;
+using search::common::FileHeaderContext;
+using vespalib::CpuUsage;
using vespalib::FileHeader;
+using vespalib::GenerationHandler;
+using vespalib::IllegalHeaderException;
+using vespalib::makeLambdaTask;
using vespalib::make_string;
using vespalib::nbostream;
-using vespalib::IllegalHeaderException;
-using vespalib::GenerationHandler;
-using search::common::FileHeaderContext;
namespace search {
@@ -161,9 +163,9 @@ WriteableFileChunk::~WriteableFileChunk()
{
if (!frozen()) {
if (_active->size() || _active->count()) {
- flush(true, _serialNum);
+ flush(true, _serialNum, CpuUsage::Category::WRITE);
}
- freeze();
+ freeze(CpuUsage::Category::WRITE);
}
// This is a wild stab at fixing bug 6348143.
// If it works it indicates something bad with the filesystem.
@@ -188,9 +190,10 @@ WriteableFileChunk::updateLidMap(const unique_lock &guard, ISetLid &ds, uint64_t
}
void
-WriteableFileChunk::restart(uint32_t nextChunkId)
+WriteableFileChunk::restart(uint32_t nextChunkId, CpuUsage::Category cpu_category)
{
- _executor.execute(makeLambdaTask([this, nextChunkId] {fileWriter(nextChunkId);}));
+ auto task = makeLambdaTask([this, nextChunkId] {fileWriter(nextChunkId);});
+ _executor.execute(CpuUsage::wrap(std::move(task), cpu_category));
}
namespace {
@@ -282,7 +285,7 @@ WriteableFileChunk::read(uint32_t lid, SubChunkId chunkId, vespalib::DataBuffer
}
void
-WriteableFileChunk::internalFlush(uint32_t chunkId, uint64_t serialNum)
+WriteableFileChunk::internalFlush(uint32_t chunkId, uint64_t serialNum, CpuUsage::Category cpu_category)
{
Chunk * active(nullptr);
{
@@ -305,11 +308,11 @@ WriteableFileChunk::internalFlush(uint32_t chunkId, uint64_t serialNum)
std::lock_guard innerGuard(_lock);
setDiskFootprint(FileChunk::getDiskFootprint() + tmp->getBuf().getDataLen());
}
- enque(std::move(tmp));
+ enque(std::move(tmp), cpu_category);
}
void
-WriteableFileChunk::enque(ProcessedChunkUP tmp)
+WriteableFileChunk::enque(ProcessedChunkUP tmp, CpuUsage::Category cpu_category)
{
LOG(debug, "enqueing %p", tmp.get());
std::unique_lock guard(_writeMonitor);
@@ -319,7 +322,7 @@ WriteableFileChunk::enque(ProcessedChunkUP tmp)
uint32_t nextChunkId = _firstChunkIdToBeWritten;
guard.unlock();
_writeCond.notify_one();
- restart(nextChunkId);
+ restart(nextChunkId, cpu_category);
} else {
_writeCond.notify_one();
}
@@ -563,11 +566,11 @@ WriteableFileChunk::getModificationTime() const
}
void
-WriteableFileChunk::freeze()
+WriteableFileChunk::freeze(CpuUsage::Category cpu_category)
{
if (!frozen()) {
waitForAllChunksFlushedToDisk();
- enque(ProcessedChunkUP());
+ enque(ProcessedChunkUP(), cpu_category);
{
std::unique_lock guard(_writeMonitor);
while (_writeTaskIsRunning) {
@@ -665,12 +668,15 @@ int32_t WriteableFileChunk::flushLastIfNonEmpty(bool force)
}
void
-WriteableFileChunk::flush(bool block, uint64_t syncToken)
+WriteableFileChunk::flush(bool block, uint64_t syncToken, CpuUsage::Category cpu_category)
{
int32_t chunkId = flushLastIfNonEmpty(syncToken > _serialNum);
if (chunkId >= 0) {
setSerialNum(syncToken);
- _executor.execute(makeLambdaTask([this, chunkId, serialNum=_serialNum] { internalFlush(chunkId, serialNum); }));
+ auto task = makeLambdaTask([this, chunkId, serialNum=_serialNum, cpu_category] {
+ internalFlush(chunkId, serialNum, cpu_category);
+ });
+ _executor.execute(CpuUsage::wrap(std::move(task), cpu_category));
} else {
if (block) {
std::lock_guard guard(_lock);
@@ -716,11 +722,12 @@ WriteableFileChunk::waitForAllChunksFlushedToDisk() const
}
LidInfo
-WriteableFileChunk::append(uint64_t serialNum, uint32_t lid, const void * buffer, size_t len)
+WriteableFileChunk::append(uint64_t serialNum, uint32_t lid, const void * buffer, size_t len,
+ CpuUsage::Category cpu_category)
{
assert( !frozen() );
if ( ! _active->hasRoom(len)) {
- flush(false, _serialNum);
+ flush(false, _serialNum, cpu_category);
}
assert(serialNum >= _serialNum);
_serialNum = serialNum;
diff --git a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h
index f1b69a5f1f9..846a9a9035e 100644
--- a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h
+++ b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h
@@ -53,13 +53,14 @@ public:
ssize_t read(uint32_t lid, SubChunkId chunk, vespalib::DataBuffer & buffer) const override;
void read(LidInfoWithLidV::const_iterator begin, size_t count, IBufferVisitor & visitor) const override;
- LidInfo append(uint64_t serialNum, uint32_t lid, const void * buffer, size_t len);
- void flush(bool block, uint64_t syncToken);
+ LidInfo append(uint64_t serialNum, uint32_t lid, const void * buffer, size_t len,
+ vespalib::CpuUsage::Category cpu_category);
+ void flush(bool block, uint64_t syncToken, vespalib::CpuUsage::Category cpu_category);
uint64_t getSerialNum() const { return _serialNum; }
void setSerialNum(uint64_t serialNum) { _serialNum = std::max(_serialNum, serialNum); }
vespalib::system_time getModificationTime() const override;
- void freeze();
+ void freeze(vespalib::CpuUsage::Category cpu_category);
size_t getDiskFootprint() const override;
size_t getMemoryFootprint() const override;
size_t getMemoryMetaFootprint() const override;
@@ -80,11 +81,11 @@ private:
void waitForChunkFlushedToDisk(uint32_t chunkId) const;
void waitForAllChunksFlushedToDisk() const;
void fileWriter(const uint32_t firstChunkId);
- void internalFlush(uint32_t, uint64_t serialNum);
- void enque(ProcessedChunkUP);
+ void internalFlush(uint32_t, uint64_t serialNum, vespalib::CpuUsage::Category cpu_category);
+ void enque(ProcessedChunkUP, vespalib::CpuUsage::Category cpu_category);
int32_t flushLastIfNonEmpty(bool force);
// _writeMonitor should not be held when calling restart
- void restart(uint32_t nextChunkId);
+ void restart(uint32_t nextChunkId, vespalib::CpuUsage::Category cpu_category);
ProcessedChunkQ drainQ(unique_lock & guard);
void readDataHeader();
void readIdxHeader(FastOS_FileInterface & idxFile);
diff --git a/vespalib/src/vespa/vespalib/util/spin_lock.h b/vespalib/src/vespa/vespalib/util/spin_lock.h
index abc2b89106f..3af7bc0fd55 100644
--- a/vespalib/src/vespa/vespalib/util/spin_lock.h
+++ b/vespalib/src/vespa/vespalib/util/spin_lock.h
@@ -1,5 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
#include <atomic>
#include <thread>