diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-14 12:38:17 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-14 13:06:17 +0000 |
commit | 749dc5565a6a99ec3323ffb366dcf60435cc8d21 (patch) | |
tree | 54e9967999a02c48a12a92aaea34573822cdb87f /searchlib | |
parent | 03a7e30be66cf799c4e0b703444f45249ebd7880 (diff) |
Use std::mutex/std::condition_variable
Diffstat (limited to 'searchlib')
10 files changed, 139 insertions, 136 deletions
diff --git a/searchlib/src/tests/docstore/file_chunk/file_chunk_test.cpp b/searchlib/src/tests/docstore/file_chunk/file_chunk_test.cpp index 1e99221551f..c1321920e3b 100644 --- a/searchlib/src/tests/docstore/file_chunk/file_chunk_test.cpp +++ b/searchlib/src/tests/docstore/file_chunk/file_chunk_test.cpp @@ -29,7 +29,7 @@ struct MyFileHeaderContext : public FileHeaderContext { struct SetLidObserver : public ISetLid { std::vector<uint32_t> lids; - void setLid(const LockGuard &guard, uint32_t lid, const LidInfo &lidInfo) override { + void setLid(const unique_lock &guard, uint32_t lid, const LidInfo &lidInfo) override { (void) guard; (void) lidInfo; lids.push_back(lid); diff --git a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp index d66e178717c..408286cc8b9 100644 --- a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp +++ b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp @@ -161,7 +161,7 @@ FileChunk::erase() } size_t -FileChunk::updateLidMap(const LockGuard &guard, ISetLid &ds, uint64_t serialNum, uint32_t docIdLimit) +FileChunk::updateLidMap(const unique_lock &guard, ISetLid &ds, uint64_t serialNum, uint32_t docIdLimit) { size_t sz(0); assert(_chunkInfo.empty()); diff --git a/searchlib/src/vespa/searchlib/docstore/filechunk.h b/searchlib/src/vespa/searchlib/docstore/filechunk.h index 3febb51ca69..580e3a80e02 100644 --- a/searchlib/src/vespa/searchlib/docstore/filechunk.h +++ b/searchlib/src/vespa/searchlib/docstore/filechunk.h @@ -72,7 +72,7 @@ private: class FileChunk { public: - using LockGuard = std::unique_lock<std::mutex>; + using unique_lock = std::unique_lock<std::mutex>; class NameId { public: explicit NameId(size_t id) noexcept : _id(id) { } @@ -109,7 +109,7 @@ public: const IBucketizer *bucketizer, bool skipCrcOnRead); virtual ~FileChunk(); - virtual size_t updateLidMap(const LockGuard &guard, ISetLid &lidMap, uint64_t serialNum, uint32_t docIdLimit); + virtual size_t updateLidMap(const unique_lock &guard, ISetLid &lidMap, uint64_t serialNum, uint32_t docIdLimit); virtual ssize_t read(uint32_t lid, SubChunkId chunk, vespalib::DataBuffer & buffer) const; virtual void read(LidInfoWithLidV::const_iterator begin, size_t count, IBufferVisitor & visitor) const; void remove(uint32_t lid, uint32_t size); diff --git a/searchlib/src/vespa/searchlib/docstore/lid_info.h b/searchlib/src/vespa/searchlib/docstore/lid_info.h index 152246631a7..c30e851a07c 100644 --- a/searchlib/src/vespa/searchlib/docstore/lid_info.h +++ b/searchlib/src/vespa/searchlib/docstore/lid_info.h @@ -68,20 +68,20 @@ typedef std::vector<LidInfoWithLid> LidInfoWithLidV; class ISetLid { public: - using LockGuard = std::unique_lock<std::mutex>; + using unique_lock = std::unique_lock<std::mutex>; virtual ~ISetLid() = default; - virtual void setLid(const LockGuard & guard, uint32_t lid, const LidInfo & lm) = 0; + virtual void setLid(const unique_lock & guard, uint32_t lid, const LidInfo & lm) = 0; }; class IGetLid { public: using Guard = vespalib::GenerationHandler::Guard; - using LockGuard = std::unique_lock<std::mutex>; + using unique_lock = std::unique_lock<std::mutex>; virtual ~IGetLid() = default; virtual LidInfo getLid(const Guard & guard, uint32_t lid) const = 0; - virtual LockGuard getLidGuard(uint32_t lid) const = 0; + virtual unique_lock getLidGuard(uint32_t lid) const = 0; virtual Guard getLidReadGuard() const = 0; }; diff --git a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp index 37510afc572..76c672b0d9a 100644 --- a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp +++ b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp @@ -21,7 +21,6 @@ namespace { constexpr uint32_t DEFAULT_MAX_LIDS_PER_FILE = 32 * 1024 * 1024; } -using vespalib::LockGuard; using vespalib::getLastErrorString; using vespalib::getErrorString; using vespalib::GenerationHandler; @@ -96,7 +95,7 @@ void LogDataStore::reconfigure(const Config & config) { void LogDataStore::updateSerialNum() { - LockGuard guard(_updateLock); + std::unique_lock guard(_updateLock); if (getPrevActive(guard) != nullptr) { if (getActive(guard).getSerialNum() < getPrevActive(guard)->getLastPersistedSerialNum()) { @@ -117,7 +116,7 @@ void LogDataStore::updateLidMap(uint32_t lastFileChunkDocIdLimit) { uint64_t lastSerialNum(0); - LockGuard guard(_updateLock); + std::unique_lock guard(_updateLock); for (size_t i = 0; i < _fileChunks.size(); ++i) { FileChunk::UP &chunk = _fileChunks[i]; bool lastChunk = ((i + 1) == _fileChunks.size()); @@ -180,20 +179,20 @@ LogDataStore::read(uint32_t lid, vespalib::DataBuffer& buffer) const void LogDataStore::write(uint64_t serialNum, uint32_t lid, const void * buffer, size_t len) { - LockGuard guard(_updateLock); + std::unique_lock guard(_updateLock); WriteableFileChunk & active = getActive(guard); write(std::move(guard), active, serialNum, lid, buffer, len); } void -LogDataStore::write(LockGuard guard, FileId destinationFileId, uint32_t lid, const void * buffer, size_t len) +LogDataStore::write(MonitorGuard guard, FileId destinationFileId, uint32_t lid, const void * buffer, size_t len) { auto & destination = static_cast<WriteableFileChunk &>(*_fileChunks[destinationFileId.getId()]); write(std::move(guard), destination, destination.getSerialNum(), lid, buffer, len); } void -LogDataStore::write(LockGuard guard, WriteableFileChunk & destination, +LogDataStore::write(MonitorGuard guard, WriteableFileChunk & destination, uint64_t serialNum, uint32_t lid, const void * buffer, size_t len) { LidInfo lm = destination.append(serialNum, lid, buffer, len); @@ -204,7 +203,7 @@ LogDataStore::write(LockGuard guard, WriteableFileChunk & destination, } void -LogDataStore::requireSpace(LockGuard guard, WriteableFileChunk & active) +LogDataStore::requireSpace(MonitorGuard guard, WriteableFileChunk & active) { assert(active.getFileId() == getActiveFileId(guard)); size_t oldSz(active.getDiskFootprint()); @@ -235,7 +234,7 @@ LogDataStore::requireSpace(LockGuard guard, WriteableFileChunk & active) uint64_t LogDataStore::lastSyncToken() const { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); uint64_t lastSerial(getActive(guard).getLastPersistedSerialNum()); if (lastSerial == 0) { const FileChunk * prev = getPrevActive(guard); @@ -249,7 +248,7 @@ LogDataStore::lastSyncToken() const uint64_t LogDataStore::tentativeLastSyncToken() const { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); return getActive(guard).getSerialNum(); } @@ -259,7 +258,7 @@ LogDataStore::getLastFlushTime() const if (lastSyncToken() == 0) { return vespalib::system_time(); } - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); vespalib::system_time timeStamp(getActive(guard).getModificationTime()); if (timeStamp == vespalib::system_time()) { const FileChunk * prev = getPrevActive(guard); @@ -274,7 +273,7 @@ LogDataStore::getLastFlushTime() const void LogDataStore::remove(uint64_t serialNum, uint32_t lid) { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); if (lid < getDocIdLimit()) { LidInfo lm = _lidInfo[lid]; if (lm.valid()) { @@ -343,7 +342,7 @@ LogDataStore::flush(uint64_t syncToken) std::unique_ptr<FileChunkHolder> activeHolder; assert(syncToken == _initFlushSyncToken); { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); // Note: Feed latency spike getActive(guard).flush(true, syncToken); active = &getActive(guard); @@ -368,7 +367,7 @@ double LogDataStore::getMaxBucketSpread() const { double maxSpread(1.0); - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); for (const FileChunk::UP & fc : _fileChunks) { if (fc) { if (_bucketizer && fc->frozen()) { @@ -385,7 +384,7 @@ LogDataStore::findNextToCompact(double bloatLimit, double spreadLimit, bool prio typedef std::multimap<double, FileId, std::greater<double>> CostMap; CostMap worstBloat; CostMap worstSpread; - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); for (size_t i(0); i < _fileChunks.size(); i++) { const FileChunk::UP & fc(_fileChunks[i]); if (fc && fc->frozen() && (_currentlyCompacting.find(fc->getNameId()) == _currentlyCompacting.end())) { @@ -429,7 +428,7 @@ LogDataStore::compactWorst(double bloatLimit, double spreadLimit, bool prioritiz } } -SerialNum LogDataStore::flushFile(LockGuard guard, WriteableFileChunk & file, SerialNum syncToken) { +SerialNum LogDataStore::flushFile(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken) { (void) guard; uint64_t lastSerial(file.getSerialNum()); if (lastSerial > syncToken) { @@ -439,7 +438,7 @@ SerialNum LogDataStore::flushFile(LockGuard guard, WriteableFileChunk & file, Se return syncToken; } -void LogDataStore::flushFileAndWait(LockGuard guard, WriteableFileChunk & file, SerialNum syncToken) { +void LogDataStore::flushFileAndWait(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken) { syncToken = flushFile(std::move(guard), file, syncToken); file.waitForDiskToCatchUpToNow(); _tlSyncer.sync(syncToken); @@ -447,13 +446,13 @@ void LogDataStore::flushFileAndWait(LockGuard guard, WriteableFileChunk & file, } SerialNum LogDataStore::flushActive(SerialNum syncToken) { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); WriteableFileChunk &active = getActive(guard); return flushFile(std::move(guard), active, syncToken); } void LogDataStore::flushActiveAndWait(SerialNum syncToken) { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); WriteableFileChunk &active = getActive(guard); return flushFileAndWait(std::move(guard), active, syncToken); } @@ -462,7 +461,7 @@ bool LogDataStore::shouldCompactToActiveFile(size_t compactedSize) const { return (_config.getMinFileSizeFactor() * _config.getMaxFileSize() > compactedSize); } -void LogDataStore::setNewFileChunk(const LockGuard & guard, FileChunk::UP file) +void LogDataStore::setNewFileChunk(const MonitorGuard & guard, FileChunk::UP file) { assert(hasUpdateLock(guard)); size_t fileId = file->getFileId().getId(); @@ -480,7 +479,7 @@ void LogDataStore::compactFile(FileId fileId) FileId destinationFileId = FileId::active(); if (_bucketizer) { if ( ! shouldCompactToActiveFile(fc->getDiskFootprint() - fc->getDiskBloat())) { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); destinationFileId = allocateFileId(guard); setNewFileChunk(guard, createWritableFile(destinationFileId, fc->getLastPersistedSerialNum(), fc->getNameId().next())); } @@ -496,7 +495,7 @@ void LogDataStore::compactFile(FileId fileId) if (destinationFileId.isActive()) { flushActiveAndWait(0); } else { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); auto & compactTo = dynamic_cast<WriteableFileChunk &>(*_fileChunks[destinationFileId.getId()]); flushFileAndWait(std::move(guard), compactTo, 0); compactTo.freeze(); @@ -506,14 +505,14 @@ void LogDataStore::compactFile(FileId fileId) std::this_thread::sleep_for(1s); uint64_t currentGeneration; { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); currentGeneration = _genHandler.getCurrentGeneration(); _genHandler.incGeneration(); } FileChunk::UP toDie; for (;;) { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); _genHandler.updateFirstUsedGeneration(); if (currentGeneration < _genHandler.getFirstUsedGeneration()) { if (_holdFileChunks[fc->getFileId().getId()] == 0u) { @@ -529,7 +528,7 @@ void LogDataStore::compactFile(FileId fileId) std::this_thread::sleep_for(1s); } toDie->erase(); - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); _currentlyCompacting.erase(compactedNameId); } @@ -538,7 +537,7 @@ LogDataStore::memoryUsed() const { size_t sz(memoryMeta()); { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); for (const FileChunk::UP & fc : _fileChunks) { if (fc) { sz += fc->getMemoryFootprint(); @@ -551,7 +550,7 @@ LogDataStore::memoryUsed() const size_t LogDataStore::memoryMeta() const { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); size_t sz(_lidInfo.getMemoryUsage().allocatedBytes()); for (const FileChunk::UP & fc : _fileChunks) { if (fc) { @@ -562,7 +561,7 @@ LogDataStore::memoryMeta() const } FileChunk::FileId -LogDataStore::allocateFileId(const LockGuard & guard) +LogDataStore::allocateFileId(const MonitorGuard & guard) { (void) guard; for (size_t i(0); i < _fileChunks.size(); i++) { @@ -581,7 +580,7 @@ LogDataStore::allocateFileId(const LockGuard & guard) size_t LogDataStore::getDiskFootprint() const { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); size_t sz(0); for (const FileChunk::UP & fc : _fileChunks) { if (fc) { @@ -595,7 +594,7 @@ LogDataStore::getDiskFootprint() const size_t LogDataStore::getDiskHeaderFootprint() const { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); size_t sz(0); for (const FileChunk::UP & fc : _fileChunks) { if (fc) { @@ -609,7 +608,7 @@ LogDataStore::getDiskHeaderFootprint() const size_t LogDataStore::getDiskBloat() const { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); size_t sz(0); for (FileId i(0); i < FileId(_fileChunks.size()); i = i.next()) { /// Do not count the holes in the last file as bloat @@ -853,7 +852,7 @@ LogDataStore::findIncompleteCompactedFiles(const NameIdSet & partList) { LogDataStore::NameIdSet LogDataStore::getAllActiveFiles() const { NameIdSet files; - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); for (const auto & fc : _fileChunks) { if (fc) { files.insert(fc->getNameId()); @@ -948,7 +947,7 @@ LogDataStore::scanDir(const vespalib::string &dir, const vespalib::string &suffi } void -LogDataStore::setLid(const LockGuard &guard, uint32_t lid, const LidInfo &meta) +LogDataStore::setLid(const MonitorGuard &guard, uint32_t lid, const LidInfo &meta) { (void) guard; if (lid < _lidInfo.size()) { @@ -1009,7 +1008,7 @@ LogDataStore::computeNumberOfSignificantBucketIdBits(const IBucketizer & bucketi void LogDataStore::verify(bool reportOnly) const { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); for (const FileChunk::UP & fc : _fileChunks) { if (fc) { fc->verify(reportOnly); @@ -1022,7 +1021,7 @@ class LogDataStore::WrapVisitor : public IWriteData IDataStoreVisitor &_visitor; public: - void write(LockGuard guard, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz) override { + void write(MonitorGuard guard, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz) override { (void) chunkId; guard.unlock(); _visitor.visit(lid, buffer, sz); @@ -1101,7 +1100,7 @@ LogDataStore::accept(IDataStoreVisitor &visitor, internalFlushAll(); FileChunk::UP toDie; { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); toDie = std::move(_fileChunks[fcId.getId()]); } toDie->erase(); @@ -1117,7 +1116,7 @@ double LogDataStore::getVisitCost() const { uint32_t totalChunks = 0; - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); for (auto &fc : _fileChunks) { totalChunks += fc->getNumChunks(); } @@ -1146,7 +1145,7 @@ LogDataStore::holdFileChunk(FileId fileId) void LogDataStore::unholdFileChunk(FileId fileId) { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); assert(fileId.getId() < _holdFileChunks.size()); assert(_holdFileChunks[fileId.getId()] > 0u); --_holdFileChunks[fileId.getId()]; @@ -1170,7 +1169,7 @@ LogDataStore::getStorageStats() const vespalib::MemoryUsage LogDataStore::getMemoryUsage() const { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); vespalib::MemoryUsage result; result.merge(_lidInfo.getMemoryUsage()); for (const auto &fileChunk : _fileChunks) { @@ -1186,7 +1185,7 @@ LogDataStore::getFileChunkStats() const { std::vector<DataStoreFileChunkStats> result; { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); for (const FileChunk::UP & fc : _fileChunks) { if (fc) { result.push_back(fc->getStats()); @@ -1200,7 +1199,7 @@ LogDataStore::getFileChunkStats() const void LogDataStore::compactLidSpace(uint32_t wantedDocLidLimit) { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); assert(wantedDocLidLimit <= getDocIdLimit()); for (size_t i = wantedDocLidLimit; i < _lidInfo.size(); ++i) { _lidInfo[i] = LidInfo(); @@ -1213,12 +1212,12 @@ LogDataStore::compactLidSpace(uint32_t wantedDocLidLimit) bool LogDataStore::canShrinkLidSpace() const { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); return canShrinkLidSpace(guard); } bool -LogDataStore::canShrinkLidSpace(const LockGuard &) const +LogDataStore::canShrinkLidSpace(const MonitorGuard &) const { return getDocIdLimit() < _lidInfo.size() && _compactLidSpaceGeneration < _genHandler.getFirstUsedGeneration(); @@ -1227,7 +1226,7 @@ LogDataStore::canShrinkLidSpace(const LockGuard &) const size_t LogDataStore::getEstimatedShrinkLidSpaceGain() const { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); if (!canShrinkLidSpace(guard)) { return 0; } @@ -1237,7 +1236,7 @@ LogDataStore::getEstimatedShrinkLidSpaceGain() const void LogDataStore::shrinkLidSpace() { - LockGuard guard(_updateLock); + MonitorGuard guard(_updateLock); if (!canShrinkLidSpace(guard)) { return; } diff --git a/searchlib/src/vespa/searchlib/docstore/logdatastore.h b/searchlib/src/vespa/searchlib/docstore/logdatastore.h index 12073c49cde..4d737801c22 100644 --- a/searchlib/src/vespa/searchlib/docstore/logdatastore.h +++ b/searchlib/src/vespa/searchlib/docstore/logdatastore.h @@ -33,7 +33,7 @@ private: using FileId = FileChunk::FileId; public: using NameIdSet = std::set<NameId>; - using LockGuard = std::unique_lock<std::mutex>; + using MonitorGuard = std::unique_lock<std::mutex>; using CompressionConfig = vespalib::compression::CompressionConfig; class Config { public: @@ -119,8 +119,8 @@ public: const Config & getConfig() const { return _config; } Config & getConfig() { return _config; } - void write(LockGuard guard, WriteableFileChunk & destination, uint64_t serialNum, uint32_t lid, const void * buffer, size_t len); - void write(LockGuard guard, FileId destinationFileId, uint32_t lid, const void * buffer, size_t len); + void write(MonitorGuard guard, WriteableFileChunk & destination, uint64_t serialNum, uint32_t lid, const void * buffer, size_t len); + void write(MonitorGuard guard, FileId destinationFileId, uint32_t lid, const void * buffer, size_t len); /** * This will spinn through the data and verify the content of both @@ -146,9 +146,9 @@ public: } // Implements IGetLid API - IGetLid::LockGuard getLidGuard(uint32_t lid) const override { + IGetLid::unique_lock getLidGuard(uint32_t lid) const override { (void) lid; - return IGetLid::LockGuard(_updateLock); + return IGetLid::unique_lock(_updateLock); } // Implements IGetLid API @@ -160,12 +160,12 @@ public: return LidInfo(); } } - FileId getActiveFileId(const LockGuard & guard) const { + FileId getActiveFileId(const MonitorGuard & guard) const { assert(hasUpdateLock(guard)); (void) guard; return _active; } - bool hasUpdateLock(const LockGuard & guard) const { + bool hasUpdateLock(const MonitorGuard & guard) const { return (guard.mutex() == &_updateLock) && guard.owns_lock(); } @@ -188,7 +188,7 @@ private: class FileChunkHolder; // Implements ISetLid API - void setLid(const ISetLid::LockGuard & guard, uint32_t lid, const LidInfo & lm) override; + void setLid(const ISetLid::unique_lock & guard, uint32_t lid, const LidInfo & lm) override; void compactWorst(double bloatLimit, double spreadLimit, bool prioritizeDiskBloat); void compactFile(FileId chunkId); @@ -209,25 +209,25 @@ private: bool isTotalDiskBloatExceeded(size_t diskFootPrint, size_t bloat) const; NameIdSet scanDir(const vespalib::string &dir, const vespalib::string &suffix); - FileId allocateFileId(const LockGuard & guard); - void setNewFileChunk(const LockGuard & guard, FileChunk::UP fileChunk); + FileId allocateFileId(const MonitorGuard & guard); + void setNewFileChunk(const MonitorGuard & guard, FileChunk::UP fileChunk); vespalib::string ls(const NameIdSet & partList); - WriteableFileChunk & getActive(const LockGuard & guard) { + WriteableFileChunk & getActive(const MonitorGuard & guard) { assert(hasUpdateLock(guard)); return static_cast<WriteableFileChunk &>(*_fileChunks[_active.getId()]); } - const WriteableFileChunk & getActive(const LockGuard & guard) const { + const WriteableFileChunk & getActive(const MonitorGuard & guard) const { assert(hasUpdateLock(guard)); return static_cast<const WriteableFileChunk &>(*_fileChunks[_active.getId()]); } - const FileChunk * getPrevActive(const LockGuard & guard) const { + const FileChunk * getPrevActive(const MonitorGuard & guard) const { assert(hasUpdateLock(guard)); return ( !_prevActive.isActive() ) ? _fileChunks[_prevActive.getId()].get() : nullptr; } - void setActive(const LockGuard & guard, FileId fileId) { + void setActive(const MonitorGuard & guard, FileId fileId) { assert(hasUpdateLock(guard)); _prevActive = _active; _active = fileId; @@ -242,7 +242,7 @@ private: vespalib::string createDatFileName(NameId id) const; vespalib::string createIdxFileName(NameId id) const; - void requireSpace(LockGuard guard, WriteableFileChunk & active); + void requireSpace(MonitorGuard guard, WriteableFileChunk & active); bool isReadOnly() const { return _readOnly; } void updateSerialNum(); @@ -259,17 +259,17 @@ private: */ void unholdFileChunk(FileId fileId); - SerialNum flushFile(LockGuard guard, WriteableFileChunk & file, SerialNum syncToken); + SerialNum flushFile(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken); SerialNum flushActive(SerialNum syncToken); void flushActiveAndWait(SerialNum syncToken); - void flushFileAndWait(LockGuard guard, WriteableFileChunk & file, SerialNum syncToken); + void flushFileAndWait(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken); SerialNum getMinLastPersistedSerialNum() const { return (_fileChunks.empty() ? 0 : _fileChunks.back()->getLastPersistedSerialNum()); } bool shouldCompactToActiveFile(size_t compactedSize) const; std::pair<bool, FileId> findNextToCompact(double bloatLimit, double spreadLimit, bool prioritizeDiskBloat); void incGeneration(); - bool canShrinkLidSpace(const LockGuard &guard) const; + bool canShrinkLidSpace(const MonitorGuard &guard) const; typedef std::vector<FileId> FileIdxVector; Config _config; diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp index d7711b61d78..6418305e01f 100644 --- a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp +++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp @@ -18,7 +18,8 @@ StoreByBucket::StoreByBucket(MemoryDataStore & backingMemory, Executor & executo _where(), _backingMemory(backingMemory), _executor(executor), - _monitor(), + _lock(std::make_unique<std::mutex>()), + _cond(std::make_unique<std::condition_variable>()), _numChunksPosted(0), _chunks(), _compression(compression) @@ -50,7 +51,7 @@ StoreByBucket::createChunk() size_t StoreByBucket::getChunkCount() const { - vespalib::LockGuard guard(_monitor); + std::lock_guard guard(*_lock); return _chunks.size(); } @@ -61,24 +62,24 @@ StoreByBucket::closeChunk(Chunk::UP chunk) chunk->pack(1, buffer, _compression); buffer.shrink(buffer.getDataLen()); ConstBufferRef bufferRef(_backingMemory.push_back(buffer.getData(), buffer.getDataLen()).data(), buffer.getDataLen()); - vespalib::MonitorGuard guard(_monitor); + std::lock_guard guard(*_lock); _chunks[chunk->getId()] = bufferRef; if (_numChunksPosted == _chunks.size()) { - guard.signal(); + _cond->notify_one(); } } void StoreByBucket::incChunksPosted() { - vespalib::MonitorGuard guard(_monitor); + std::lock_guard guard(*_lock); _numChunksPosted++; } void StoreByBucket::waitAllProcessed() { - vespalib::MonitorGuard guard(_monitor); + std::unique_lock guard(*_lock); while (_numChunksPosted != _chunks.size()) { - guard.wait(); + _cond->wait(guard); } } diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.h b/searchlib/src/vespa/searchlib/docstore/storebybucket.h index 1365dcb4416..0716a7c3ffc 100644 --- a/searchlib/src/vespa/searchlib/docstore/storebybucket.h +++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.h @@ -6,9 +6,9 @@ #include <vespa/document/bucket/bucketid.h> #include <vespa/vespalib/data/memorydatastore.h> #include <vespa/vespalib/util/executor.h> -#include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/stllike/hash_map.h> #include <map> +#include <condition_variable> namespace search::docstore { @@ -72,7 +72,8 @@ private: std::map<uint64_t, IndexVector> _where; MemoryDataStore & _backingMemory; Executor & _executor; - vespalib::Monitor _monitor; + std::unique_ptr<std::mutex> _lock; + std::unique_ptr<std::condition_variable> _cond; size_t _numChunksPosted; vespalib::hash_map<uint64_t, ConstBufferRef> _chunks; CompressionConfig _compression; diff --git a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp index cdf6220dfbc..8f03bf9e791 100644 --- a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp +++ b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp @@ -17,8 +17,6 @@ LOG_SETUP(".search.writeablefilechunk"); using vespalib::makeLambdaTask; using vespalib::FileHeader; using vespalib::make_string; -using vespalib::LockGuard; -using vespalib::MonitorGuard; using vespalib::nbostream; using vespalib::IllegalHeaderException; using vespalib::GenerationHandler; @@ -89,6 +87,7 @@ WriteableFileChunk(vespalib::Executor &executor, _serialNum(initialSerialNum), _frozen(false), _lock(), + _cond(), _writeLock(), _flushLock(), _dataFile(_dataFileName.c_str()), @@ -105,6 +104,8 @@ WriteableFileChunk(vespalib::Executor &executor, _maxChunkSize(0x100000), _firstChunkIdToBeWritten(0), _writeTaskIsRunning(false), + _writeMonitor(), + _writeCond(), _executor(executor), _bucketMap(bucketizer) { @@ -171,7 +172,7 @@ WriteableFileChunk::~WriteableFileChunk() } size_t -WriteableFileChunk::updateLidMap(const LockGuard &guard, ISetLid &ds, uint64_t serialNum, uint32_t docIdLimit) +WriteableFileChunk::updateLidMap(const unique_lock &guard, ISetLid &ds, uint64_t serialNum, uint32_t docIdLimit) { size_t sz = FileChunk::updateLidMap(guard, ds, serialNum, docIdLimit); _nextChunkId = _chunkInfo.size(); @@ -220,7 +221,7 @@ WriteableFileChunk::read(LidInfoWithLidV::const_iterator begin, size_t count, IB vespalib::hash_map<uint32_t, ChunkInfo> chunksOnFile; std::vector<LidAndBuffer> buffers; { - vespalib::LockGuard guard(_lock); + std::lock_guard guard(_lock); for (size_t i(0); i < count; i++) { const LidInfoWithLid & li = *(begin + i); uint32_t chunk = li.getChunkId(); @@ -260,7 +261,7 @@ WriteableFileChunk::read(uint32_t lid, SubChunkId chunkId, vespalib::DataBuffer { ChunkInfo chunkInfo; if (!frozen()) { - vespalib::LockGuard guard(_lock); + std::lock_guard guard(_lock); if ((chunkId >= _chunkInfo.size()) || !_chunkInfo[chunkId].valid()) { auto found = _chunkMap.find(chunkId); if (found != _chunkMap.end()) { @@ -282,7 +283,7 @@ WriteableFileChunk::internalFlush(uint32_t chunkId, uint64_t serialNum) { Chunk * active(nullptr); { - vespalib::LockGuard guard(_lock); + std::lock_guard guard(_lock); active = _chunkMap[chunkId].get(); } @@ -298,7 +299,7 @@ WriteableFileChunk::internalFlush(uint32_t chunkId, uint64_t serialNum) tmp->getBuf().moveFreeToData(padAfter); } { - vespalib::LockGuard innerGuard(_lock); + std::lock_guard innerGuard(_lock); setDiskFootprint(FileChunk::getDiskFootprint() + tmp->getBuf().getDataLen()); } enque(std::move(tmp)); @@ -308,16 +309,16 @@ void WriteableFileChunk::enque(ProcessedChunkUP tmp) { LOG(debug, "enqueing %p", tmp.get()); - MonitorGuard guard(_writeMonitor); + std::unique_lock guard(_writeMonitor); _writeQ.push_back(std::move(tmp)); if ( ! _writeTaskIsRunning) { _writeTaskIsRunning = true; uint32_t nextChunkId = _firstChunkIdToBeWritten; - guard.signal(); guard.unlock(); + _writeCond.notify_one(); restart(nextChunkId); } else { - guard.signal(); + _writeCond.notify_one(); } } @@ -356,12 +357,13 @@ getAlignedStartPos(FastOS_File & file) } WriteableFileChunk::ProcessedChunkQ -WriteableFileChunk::drainQ(MonitorGuard & guard) +WriteableFileChunk::drainQ(unique_lock & guard) { + assert(guard.mutex() == &_writeMonitor && guard.owns_lock()); ProcessedChunkQ newChunks; newChunks.swap(_writeQ); if ( ! newChunks.empty() ) { - guard.broadcast(); + _writeCond.notify_one(); } return newChunks; } @@ -396,11 +398,11 @@ WriteableFileChunk::fetchNextChain(ProcessedChunkMap & orderedChunks, const uint } ChunkMeta -WriteableFileChunk::computeChunkMeta(const vespalib::LockGuard & guard, +WriteableFileChunk::computeChunkMeta(const unique_lock & guard, const GenerationHandler::Guard & bucketizerGuard, size_t offset, const ProcessedChunk & tmp, const Chunk & active) { - assert(guard.locks(_lock)); + assert((guard.mutex() == &_lock) && guard.owns_lock()); size_t dataLen = tmp.getBuf().getDataLen(); const ChunkMeta cmeta(offset, tmp.getPayLoad(), active.getLastSerial(), active.count()); assert((size_t(tmp.getBuf().getData())%_alignment) == 0); @@ -431,8 +433,7 @@ WriteableFileChunk::computeChunkMeta(ProcessedChunkQ & chunks, size_t startPos, ChunkMetaV cmetaV; cmetaV.reserve(chunks.size()); uint64_t lastSerial(_lastPersistedSerialNum); - (void) lastSerial; - vespalib::LockGuard guard(_lock); + std::unique_lock guard(_lock); if (!_pendingChunks.empty()) { const PendingChunk & pc = *_pendingChunks.back(); @@ -468,7 +469,7 @@ WriteableFileChunk::writeData(const ProcessedChunkQ & chunks, size_t sz) buf.writeBytes(chunk->getBuf().getData(), chunk->getBuf().getDataLen()); } - LockGuard guard(_writeLock); + std::lock_guard guard(_writeLock); ssize_t wlen = _dataFile.Write2(buf.getData(), buf.getDataLen()); if (wlen != static_cast<ssize_t>(buf.getDataLen())) { throw SummaryException(make_string("Failed writing %ld bytes to dat file. Only %ld written", @@ -481,7 +482,7 @@ WriteableFileChunk::writeData(const ProcessedChunkQ & chunks, size_t sz) void WriteableFileChunk::updateChunkInfo(const ProcessedChunkQ & chunks, const ChunkMetaV & cmetaV, size_t sz) { - MonitorGuard guard(_lock); + std::lock_guard guard(_lock); size_t nettoSz(sz); for (size_t i(0); i < chunks.size(); i++) { const ProcessedChunk & chunk = *chunks[i]; @@ -496,7 +497,7 @@ WriteableFileChunk::updateChunkInfo(const ProcessedChunkQ & chunks, const ChunkM _chunkMap.erase(_chunkMap.begin()); } setDiskFootprint(FileChunk::getDiskFootprint() - nettoSz); - guard.broadcast(); + _cond.notify_all(); } void @@ -505,7 +506,7 @@ WriteableFileChunk::fileWriter(const uint32_t firstChunkId) LOG(debug, "Starting the filewriter with chunkid = %d", firstChunkId); uint32_t nextChunkId(firstChunkId); bool done(false); - MonitorGuard guard(_writeMonitor); + std::unique_lock guard(_writeMonitor); { for (ProcessedChunkQ newChunks(drainQ(guard)); !newChunks.empty(); newChunks = drainQ(guard)) { guard.unlock(); @@ -518,7 +519,7 @@ WriteableFileChunk::fileWriter(const uint32_t firstChunkId) writeData(chunks, sz); updateChunkInfo(chunks, cmetaV, sz); LOG(spam, "bucket spread = '%3.2f'", getBucketSpread()); - guard = MonitorGuard(_writeMonitor); + guard = std::unique_lock(_writeMonitor); if (done) break; } } @@ -532,7 +533,7 @@ WriteableFileChunk::fileWriter(const uint32_t firstChunkId) (void) cm; assert(cm.valid() && cm.getSize() != 0); } - guard.broadcast(); + _writeCond.notify_all(); } else { _firstChunkIdToBeWritten = nextChunkId; } @@ -541,7 +542,7 @@ WriteableFileChunk::fileWriter(const uint32_t firstChunkId) vespalib::system_time WriteableFileChunk::getModificationTime() const { - vespalib::LockGuard guard(_lock); + std::lock_guard guard(_lock); return _modificationTime; } @@ -552,15 +553,15 @@ WriteableFileChunk::freeze() waitForAllChunksFlushedToDisk(); enque(ProcessedChunkUP()); { - MonitorGuard guard(_writeMonitor); + std::unique_lock guard(_writeMonitor); while (_writeTaskIsRunning) { - guard.wait(10); + _writeCond.wait_for(guard, 10ms); } } assert(_writeQ.empty()); assert(_chunkMap.empty()); { - MonitorGuard guard(_lock); + std::unique_lock guard(_lock); setDiskFootprint(getDiskFootprint(guard)); _frozen = true; } @@ -576,16 +577,15 @@ WriteableFileChunk::getDiskFootprint() const return FileChunk::getDiskFootprint(); } else { // Double checked locking. - MonitorGuard guard(_lock); + std::unique_lock guard(_lock); return getDiskFootprint(guard); } } size_t -WriteableFileChunk::getDiskFootprint(const vespalib::MonitorGuard & guard) const +WriteableFileChunk::getDiskFootprint(const unique_lock & guard) const { - (void) guard; - assert(guard.monitors(_lock)); + assert(guard.mutex() == &_lock && guard.owns_lock()); return frozen() ? FileChunk::getDiskFootprint() : _currentDiskFootprint + FileChunk::getDiskFootprint(); @@ -595,7 +595,7 @@ size_t WriteableFileChunk::getMemoryFootprint() const { size_t sz(0); - vespalib::LockGuard guard(_lock); + std::lock_guard guard(_lock); for (const auto & it : _chunkMap) { sz += it.second->size(); } @@ -613,7 +613,7 @@ WriteableFileChunk::getMemoryMetaFootprint() const vespalib::MemoryUsage WriteableFileChunk::getMemoryUsage() const { - vespalib::LockGuard guard(_lock); + std::lock_guard guard(_lock); vespalib::MemoryUsage result; for (const auto &chunk : _chunkMap) { result.merge(chunk.second->getMemoryUsage()); @@ -628,11 +628,11 @@ WriteableFileChunk::getMemoryUsage() const int32_t WriteableFileChunk::flushLastIfNonEmpty(bool force) { int32_t chunkId(-1); - MonitorGuard guard(_lock); + std::unique_lock guard(_lock); for (bool ready(false); !ready;) { if (_chunkMap.size() > 1000) { LOG(debug, "Summary write overload at least 1000 outstanding chunks. Suspending."); - guard.wait(); + _cond.wait(guard); LOG(debug, "Summary write overload eased off. Commencing."); } else { ready = true; @@ -656,7 +656,7 @@ WriteableFileChunk::flush(bool block, uint64_t syncToken) _executor.execute(makeLambdaTask([this, chunkId, serialNum=_serialNum] { internalFlush(chunkId, serialNum); })); } else { if (block) { - MonitorGuard guard(_lock); + std::lock_guard guard(_lock); if (!_chunkMap.empty()) { chunkId = _chunkMap.rbegin()->first; } @@ -672,7 +672,7 @@ WriteableFileChunk::waitForDiskToCatchUpToNow() const { int32_t chunkId(-1); { - MonitorGuard guard(_lock); + std::lock_guard guard(_lock); if (!_chunkMap.empty()) { chunkId = _chunkMap.rbegin()->first; } @@ -683,18 +683,18 @@ WriteableFileChunk::waitForDiskToCatchUpToNow() const void WriteableFileChunk::waitForChunkFlushedToDisk(uint32_t chunkId) const { - MonitorGuard guard(_lock); + std::unique_lock guard(_lock); while( _chunkMap.find(chunkId) != _chunkMap.end() ) { - guard.wait(); + _cond.wait(guard); } } void WriteableFileChunk::waitForAllChunksFlushedToDisk() const { - MonitorGuard guard(_lock); + std::unique_lock guard(_lock); while( ! _chunkMap.empty() ) { - guard.wait(); + _cond.wait(guard); } } @@ -811,15 +811,15 @@ WriteableFileChunk::writeIdxHeader(const FileHeaderContext &fileHeaderContext, u bool WriteableFileChunk::needFlushPendingChunks(uint64_t serialNum, uint64_t datFileLen) { - MonitorGuard guard(_lock); + std::unique_lock guard(_lock); return needFlushPendingChunks(guard, serialNum, datFileLen); } bool -WriteableFileChunk::needFlushPendingChunks(const MonitorGuard & guard, uint64_t serialNum, uint64_t datFileLen) +WriteableFileChunk::needFlushPendingChunks(const unique_lock & guard, uint64_t serialNum, uint64_t datFileLen) { (void) guard; - assert(guard.monitors(_lock)); + assert(guard.mutex() == &_lock && guard.owns_lock()); if (_pendingChunks.empty()) return false; const PendingChunk & pc = *_pendingChunks.front(); @@ -851,12 +851,12 @@ WriteableFileChunk::flushPendingChunks(uint64_t serialNum) { if (needFlushPendingChunks(serialNum, datFileLen)) { timeStamp = unconditionallyFlushPendingChunks(flushGuard, serialNum, datFileLen); } - vespalib::LockGuard guard(_lock); + std::lock_guard guard(_lock); _modificationTime = std::max(timeStamp, _modificationTime); } vespalib::system_time -WriteableFileChunk::unconditionallyFlushPendingChunks(const LockGuard &flushGuard, uint64_t serialNum, uint64_t datFileLen) +WriteableFileChunk::unconditionallyFlushPendingChunks(const unique_lock &flushGuard, uint64_t serialNum, uint64_t datFileLen) { assert((flushGuard.mutex() == &_flushLock) && flushGuard.owns_lock()); if ( ! _dataFile.Sync()) { @@ -865,7 +865,7 @@ WriteableFileChunk::unconditionallyFlushPendingChunks(const LockGuard &flushGuar nbostream os; uint64_t lastSerial = 0; { - MonitorGuard guard(_lock); + std::unique_lock guard(_lock); lastSerial = _lastPersistedSerialNum; for (;;) { if (!needFlushPendingChunks(guard, serialNum, datFileLen)) diff --git a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h index cdbbd51ec77..901e70269ec 100644 --- a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h +++ b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h @@ -4,11 +4,11 @@ #include "filechunk.h" #include <vespa/vespalib/util/executor.h> -#include <vespa/vespalib/util/sync.h> #include <vespa/searchlib/transactionlog/syncproxy.h> #include <vespa/fastos/file.h> #include <map> #include <deque> +#include <condition_variable> namespace search { @@ -64,7 +64,7 @@ public: size_t getMemoryFootprint() const override; size_t getMemoryMetaFootprint() const override; vespalib::MemoryUsage getMemoryUsage() const override; - size_t updateLidMap(const LockGuard &guard, ISetLid &lidMap, uint64_t serialNum, uint32_t docIdLimit) override; + size_t updateLidMap(const unique_lock &guard, ISetLid &lidMap, uint64_t serialNum, uint32_t docIdLimit) override; void waitForDiskToCatchUpToNow() const; void flushPendingChunks(uint64_t serialNum); DataStoreFileChunkStats getStats() const override; @@ -85,30 +85,31 @@ private: int32_t flushLastIfNonEmpty(bool force); // _writeMonitor should not be held when calling restart void restart(uint32_t nextChunkId); - ProcessedChunkQ drainQ(vespalib::MonitorGuard & guard); + ProcessedChunkQ drainQ(unique_lock & guard); void readDataHeader(); void readIdxHeader(FastOS_FileInterface & idxFile); void writeDataHeader(const common::FileHeaderContext &fileHeaderContext); bool needFlushPendingChunks(uint64_t serialNum, uint64_t datFileLen); - bool needFlushPendingChunks(const vespalib::MonitorGuard & guard, uint64_t serialNum, uint64_t datFileLen); - vespalib::system_time unconditionallyFlushPendingChunks(const LockGuard & flushGuard, uint64_t serialNum, uint64_t datFileLen); - static void insertChunks(ProcessedChunkMap & orderedChunks, ProcessedChunkQ & newChunks, const uint32_t nextChunkId); - static ProcessedChunkQ fetchNextChain(ProcessedChunkMap & orderedChunks, const uint32_t firstChunkId); - ChunkMeta computeChunkMeta(const vespalib::LockGuard & guard, + bool needFlushPendingChunks(const unique_lock & guard, uint64_t serialNum, uint64_t datFileLen); + vespalib::system_time unconditionallyFlushPendingChunks(const unique_lock & flushGuard, uint64_t serialNum, uint64_t datFileLen); + static void insertChunks(ProcessedChunkMap & orderedChunks, ProcessedChunkQ & newChunks, uint32_t nextChunkId); + static ProcessedChunkQ fetchNextChain(ProcessedChunkMap & orderedChunks, uint32_t firstChunkId); + ChunkMeta computeChunkMeta(const unique_lock & guard, const vespalib::GenerationHandler::Guard & bucketizerGuard, size_t offset, const ProcessedChunk & tmp, const Chunk & active); ChunkMetaV computeChunkMeta(ProcessedChunkQ & chunks, size_t startPos, size_t & sz, bool & done); void writeData(const ProcessedChunkQ & chunks, size_t sz); void updateChunkInfo(const ProcessedChunkQ & chunks, const ChunkMetaV & cmetaV, size_t sz); void updateCurrentDiskFootprint(); - size_t getDiskFootprint(const vespalib::MonitorGuard & guard) const; + size_t getDiskFootprint(const unique_lock & guard) const; std::unique_ptr<FastOS_FileInterface> openIdx(); Config _config; SerialNum _serialNum; bool _frozen; // Lock order is _writeLock, _flushLock, _lock - vespalib::Monitor _lock; + mutable std::mutex _lock; + mutable std::condition_variable _cond; std::mutex _writeLock; std::mutex _flushLock; FastOS_File _dataFile; @@ -127,7 +128,8 @@ private: size_t _maxChunkSize; uint32_t _firstChunkIdToBeWritten; bool _writeTaskIsRunning; - vespalib::Monitor _writeMonitor; + std::mutex _writeMonitor; + std::condition_variable _writeCond; ProcessedChunkQ _writeQ; vespalib::Executor & _executor; ProcessedChunkMap _orderedChunks; |