summaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-14 12:38:17 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-10-14 13:06:17 +0000
commit749dc5565a6a99ec3323ffb366dcf60435cc8d21 (patch)
tree54e9967999a02c48a12a92aaea34573822cdb87f /searchlib
parent03a7e30be66cf799c4e0b703444f45249ebd7880 (diff)
Use std::mutex/std::condition_variable
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/tests/docstore/file_chunk/file_chunk_test.cpp2
-rw-r--r--searchlib/src/vespa/searchlib/docstore/filechunk.cpp2
-rw-r--r--searchlib/src/vespa/searchlib/docstore/filechunk.h4
-rw-r--r--searchlib/src/vespa/searchlib/docstore/lid_info.h8
-rw-r--r--searchlib/src/vespa/searchlib/docstore/logdatastore.cpp87
-rw-r--r--searchlib/src/vespa/searchlib/docstore/logdatastore.h36
-rw-r--r--searchlib/src/vespa/searchlib/docstore/storebybucket.cpp15
-rw-r--r--searchlib/src/vespa/searchlib/docstore/storebybucket.h5
-rw-r--r--searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp92
-rw-r--r--searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h24
10 files changed, 139 insertions, 136 deletions
diff --git a/searchlib/src/tests/docstore/file_chunk/file_chunk_test.cpp b/searchlib/src/tests/docstore/file_chunk/file_chunk_test.cpp
index 1e99221551f..c1321920e3b 100644
--- a/searchlib/src/tests/docstore/file_chunk/file_chunk_test.cpp
+++ b/searchlib/src/tests/docstore/file_chunk/file_chunk_test.cpp
@@ -29,7 +29,7 @@ struct MyFileHeaderContext : public FileHeaderContext {
struct SetLidObserver : public ISetLid {
std::vector<uint32_t> lids;
- void setLid(const LockGuard &guard, uint32_t lid, const LidInfo &lidInfo) override {
+ void setLid(const unique_lock &guard, uint32_t lid, const LidInfo &lidInfo) override {
(void) guard;
(void) lidInfo;
lids.push_back(lid);
diff --git a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp
index d66e178717c..408286cc8b9 100644
--- a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp
@@ -161,7 +161,7 @@ FileChunk::erase()
}
size_t
-FileChunk::updateLidMap(const LockGuard &guard, ISetLid &ds, uint64_t serialNum, uint32_t docIdLimit)
+FileChunk::updateLidMap(const unique_lock &guard, ISetLid &ds, uint64_t serialNum, uint32_t docIdLimit)
{
size_t sz(0);
assert(_chunkInfo.empty());
diff --git a/searchlib/src/vespa/searchlib/docstore/filechunk.h b/searchlib/src/vespa/searchlib/docstore/filechunk.h
index 3febb51ca69..580e3a80e02 100644
--- a/searchlib/src/vespa/searchlib/docstore/filechunk.h
+++ b/searchlib/src/vespa/searchlib/docstore/filechunk.h
@@ -72,7 +72,7 @@ private:
class FileChunk
{
public:
- using LockGuard = std::unique_lock<std::mutex>;
+ using unique_lock = std::unique_lock<std::mutex>;
class NameId {
public:
explicit NameId(size_t id) noexcept : _id(id) { }
@@ -109,7 +109,7 @@ public:
const IBucketizer *bucketizer, bool skipCrcOnRead);
virtual ~FileChunk();
- virtual size_t updateLidMap(const LockGuard &guard, ISetLid &lidMap, uint64_t serialNum, uint32_t docIdLimit);
+ virtual size_t updateLidMap(const unique_lock &guard, ISetLid &lidMap, uint64_t serialNum, uint32_t docIdLimit);
virtual ssize_t read(uint32_t lid, SubChunkId chunk, vespalib::DataBuffer & buffer) const;
virtual void read(LidInfoWithLidV::const_iterator begin, size_t count, IBufferVisitor & visitor) const;
void remove(uint32_t lid, uint32_t size);
diff --git a/searchlib/src/vespa/searchlib/docstore/lid_info.h b/searchlib/src/vespa/searchlib/docstore/lid_info.h
index 152246631a7..c30e851a07c 100644
--- a/searchlib/src/vespa/searchlib/docstore/lid_info.h
+++ b/searchlib/src/vespa/searchlib/docstore/lid_info.h
@@ -68,20 +68,20 @@ typedef std::vector<LidInfoWithLid> LidInfoWithLidV;
class ISetLid
{
public:
- using LockGuard = std::unique_lock<std::mutex>;
+ using unique_lock = std::unique_lock<std::mutex>;
virtual ~ISetLid() = default;
- virtual void setLid(const LockGuard & guard, uint32_t lid, const LidInfo & lm) = 0;
+ virtual void setLid(const unique_lock & guard, uint32_t lid, const LidInfo & lm) = 0;
};
class IGetLid
{
public:
using Guard = vespalib::GenerationHandler::Guard;
- using LockGuard = std::unique_lock<std::mutex>;
+ using unique_lock = std::unique_lock<std::mutex>;
virtual ~IGetLid() = default;
virtual LidInfo getLid(const Guard & guard, uint32_t lid) const = 0;
- virtual LockGuard getLidGuard(uint32_t lid) const = 0;
+ virtual unique_lock getLidGuard(uint32_t lid) const = 0;
virtual Guard getLidReadGuard() const = 0;
};
diff --git a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp
index 37510afc572..76c672b0d9a 100644
--- a/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/logdatastore.cpp
@@ -21,7 +21,6 @@ namespace {
constexpr uint32_t DEFAULT_MAX_LIDS_PER_FILE = 32 * 1024 * 1024;
}
-using vespalib::LockGuard;
using vespalib::getLastErrorString;
using vespalib::getErrorString;
using vespalib::GenerationHandler;
@@ -96,7 +95,7 @@ void LogDataStore::reconfigure(const Config & config) {
void
LogDataStore::updateSerialNum()
{
- LockGuard guard(_updateLock);
+ std::unique_lock guard(_updateLock);
if (getPrevActive(guard) != nullptr) {
if (getActive(guard).getSerialNum() <
getPrevActive(guard)->getLastPersistedSerialNum()) {
@@ -117,7 +116,7 @@ void
LogDataStore::updateLidMap(uint32_t lastFileChunkDocIdLimit)
{
uint64_t lastSerialNum(0);
- LockGuard guard(_updateLock);
+ std::unique_lock guard(_updateLock);
for (size_t i = 0; i < _fileChunks.size(); ++i) {
FileChunk::UP &chunk = _fileChunks[i];
bool lastChunk = ((i + 1) == _fileChunks.size());
@@ -180,20 +179,20 @@ LogDataStore::read(uint32_t lid, vespalib::DataBuffer& buffer) const
void
LogDataStore::write(uint64_t serialNum, uint32_t lid, const void * buffer, size_t len)
{
- LockGuard guard(_updateLock);
+ std::unique_lock guard(_updateLock);
WriteableFileChunk & active = getActive(guard);
write(std::move(guard), active, serialNum, lid, buffer, len);
}
void
-LogDataStore::write(LockGuard guard, FileId destinationFileId, uint32_t lid, const void * buffer, size_t len)
+LogDataStore::write(MonitorGuard guard, FileId destinationFileId, uint32_t lid, const void * buffer, size_t len)
{
auto & destination = static_cast<WriteableFileChunk &>(*_fileChunks[destinationFileId.getId()]);
write(std::move(guard), destination, destination.getSerialNum(), lid, buffer, len);
}
void
-LogDataStore::write(LockGuard guard, WriteableFileChunk & destination,
+LogDataStore::write(MonitorGuard guard, WriteableFileChunk & destination,
uint64_t serialNum, uint32_t lid, const void * buffer, size_t len)
{
LidInfo lm = destination.append(serialNum, lid, buffer, len);
@@ -204,7 +203,7 @@ LogDataStore::write(LockGuard guard, WriteableFileChunk & destination,
}
void
-LogDataStore::requireSpace(LockGuard guard, WriteableFileChunk & active)
+LogDataStore::requireSpace(MonitorGuard guard, WriteableFileChunk & active)
{
assert(active.getFileId() == getActiveFileId(guard));
size_t oldSz(active.getDiskFootprint());
@@ -235,7 +234,7 @@ LogDataStore::requireSpace(LockGuard guard, WriteableFileChunk & active)
uint64_t
LogDataStore::lastSyncToken() const
{
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
uint64_t lastSerial(getActive(guard).getLastPersistedSerialNum());
if (lastSerial == 0) {
const FileChunk * prev = getPrevActive(guard);
@@ -249,7 +248,7 @@ LogDataStore::lastSyncToken() const
uint64_t
LogDataStore::tentativeLastSyncToken() const
{
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
return getActive(guard).getSerialNum();
}
@@ -259,7 +258,7 @@ LogDataStore::getLastFlushTime() const
if (lastSyncToken() == 0) {
return vespalib::system_time();
}
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
vespalib::system_time timeStamp(getActive(guard).getModificationTime());
if (timeStamp == vespalib::system_time()) {
const FileChunk * prev = getPrevActive(guard);
@@ -274,7 +273,7 @@ LogDataStore::getLastFlushTime() const
void
LogDataStore::remove(uint64_t serialNum, uint32_t lid)
{
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
if (lid < getDocIdLimit()) {
LidInfo lm = _lidInfo[lid];
if (lm.valid()) {
@@ -343,7 +342,7 @@ LogDataStore::flush(uint64_t syncToken)
std::unique_ptr<FileChunkHolder> activeHolder;
assert(syncToken == _initFlushSyncToken);
{
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
// Note: Feed latency spike
getActive(guard).flush(true, syncToken);
active = &getActive(guard);
@@ -368,7 +367,7 @@ double
LogDataStore::getMaxBucketSpread() const
{
double maxSpread(1.0);
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
for (const FileChunk::UP & fc : _fileChunks) {
if (fc) {
if (_bucketizer && fc->frozen()) {
@@ -385,7 +384,7 @@ LogDataStore::findNextToCompact(double bloatLimit, double spreadLimit, bool prio
typedef std::multimap<double, FileId, std::greater<double>> CostMap;
CostMap worstBloat;
CostMap worstSpread;
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
for (size_t i(0); i < _fileChunks.size(); i++) {
const FileChunk::UP & fc(_fileChunks[i]);
if (fc && fc->frozen() && (_currentlyCompacting.find(fc->getNameId()) == _currentlyCompacting.end())) {
@@ -429,7 +428,7 @@ LogDataStore::compactWorst(double bloatLimit, double spreadLimit, bool prioritiz
}
}
-SerialNum LogDataStore::flushFile(LockGuard guard, WriteableFileChunk & file, SerialNum syncToken) {
+SerialNum LogDataStore::flushFile(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken) {
(void) guard;
uint64_t lastSerial(file.getSerialNum());
if (lastSerial > syncToken) {
@@ -439,7 +438,7 @@ SerialNum LogDataStore::flushFile(LockGuard guard, WriteableFileChunk & file, Se
return syncToken;
}
-void LogDataStore::flushFileAndWait(LockGuard guard, WriteableFileChunk & file, SerialNum syncToken) {
+void LogDataStore::flushFileAndWait(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken) {
syncToken = flushFile(std::move(guard), file, syncToken);
file.waitForDiskToCatchUpToNow();
_tlSyncer.sync(syncToken);
@@ -447,13 +446,13 @@ void LogDataStore::flushFileAndWait(LockGuard guard, WriteableFileChunk & file,
}
SerialNum LogDataStore::flushActive(SerialNum syncToken) {
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
WriteableFileChunk &active = getActive(guard);
return flushFile(std::move(guard), active, syncToken);
}
void LogDataStore::flushActiveAndWait(SerialNum syncToken) {
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
WriteableFileChunk &active = getActive(guard);
return flushFileAndWait(std::move(guard), active, syncToken);
}
@@ -462,7 +461,7 @@ bool LogDataStore::shouldCompactToActiveFile(size_t compactedSize) const {
return (_config.getMinFileSizeFactor() * _config.getMaxFileSize() > compactedSize);
}
-void LogDataStore::setNewFileChunk(const LockGuard & guard, FileChunk::UP file)
+void LogDataStore::setNewFileChunk(const MonitorGuard & guard, FileChunk::UP file)
{
assert(hasUpdateLock(guard));
size_t fileId = file->getFileId().getId();
@@ -480,7 +479,7 @@ void LogDataStore::compactFile(FileId fileId)
FileId destinationFileId = FileId::active();
if (_bucketizer) {
if ( ! shouldCompactToActiveFile(fc->getDiskFootprint() - fc->getDiskBloat())) {
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
destinationFileId = allocateFileId(guard);
setNewFileChunk(guard, createWritableFile(destinationFileId, fc->getLastPersistedSerialNum(), fc->getNameId().next()));
}
@@ -496,7 +495,7 @@ void LogDataStore::compactFile(FileId fileId)
if (destinationFileId.isActive()) {
flushActiveAndWait(0);
} else {
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
auto & compactTo = dynamic_cast<WriteableFileChunk &>(*_fileChunks[destinationFileId.getId()]);
flushFileAndWait(std::move(guard), compactTo, 0);
compactTo.freeze();
@@ -506,14 +505,14 @@ void LogDataStore::compactFile(FileId fileId)
std::this_thread::sleep_for(1s);
uint64_t currentGeneration;
{
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
currentGeneration = _genHandler.getCurrentGeneration();
_genHandler.incGeneration();
}
FileChunk::UP toDie;
for (;;) {
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
_genHandler.updateFirstUsedGeneration();
if (currentGeneration < _genHandler.getFirstUsedGeneration()) {
if (_holdFileChunks[fc->getFileId().getId()] == 0u) {
@@ -529,7 +528,7 @@ void LogDataStore::compactFile(FileId fileId)
std::this_thread::sleep_for(1s);
}
toDie->erase();
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
_currentlyCompacting.erase(compactedNameId);
}
@@ -538,7 +537,7 @@ LogDataStore::memoryUsed() const
{
size_t sz(memoryMeta());
{
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
for (const FileChunk::UP & fc : _fileChunks) {
if (fc) {
sz += fc->getMemoryFootprint();
@@ -551,7 +550,7 @@ LogDataStore::memoryUsed() const
size_t
LogDataStore::memoryMeta() const
{
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
size_t sz(_lidInfo.getMemoryUsage().allocatedBytes());
for (const FileChunk::UP & fc : _fileChunks) {
if (fc) {
@@ -562,7 +561,7 @@ LogDataStore::memoryMeta() const
}
FileChunk::FileId
-LogDataStore::allocateFileId(const LockGuard & guard)
+LogDataStore::allocateFileId(const MonitorGuard & guard)
{
(void) guard;
for (size_t i(0); i < _fileChunks.size(); i++) {
@@ -581,7 +580,7 @@ LogDataStore::allocateFileId(const LockGuard & guard)
size_t
LogDataStore::getDiskFootprint() const
{
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
size_t sz(0);
for (const FileChunk::UP & fc : _fileChunks) {
if (fc) {
@@ -595,7 +594,7 @@ LogDataStore::getDiskFootprint() const
size_t
LogDataStore::getDiskHeaderFootprint() const
{
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
size_t sz(0);
for (const FileChunk::UP & fc : _fileChunks) {
if (fc) {
@@ -609,7 +608,7 @@ LogDataStore::getDiskHeaderFootprint() const
size_t
LogDataStore::getDiskBloat() const
{
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
size_t sz(0);
for (FileId i(0); i < FileId(_fileChunks.size()); i = i.next()) {
/// Do not count the holes in the last file as bloat
@@ -853,7 +852,7 @@ LogDataStore::findIncompleteCompactedFiles(const NameIdSet & partList) {
LogDataStore::NameIdSet
LogDataStore::getAllActiveFiles() const {
NameIdSet files;
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
for (const auto & fc : _fileChunks) {
if (fc) {
files.insert(fc->getNameId());
@@ -948,7 +947,7 @@ LogDataStore::scanDir(const vespalib::string &dir, const vespalib::string &suffi
}
void
-LogDataStore::setLid(const LockGuard &guard, uint32_t lid, const LidInfo &meta)
+LogDataStore::setLid(const MonitorGuard &guard, uint32_t lid, const LidInfo &meta)
{
(void) guard;
if (lid < _lidInfo.size()) {
@@ -1009,7 +1008,7 @@ LogDataStore::computeNumberOfSignificantBucketIdBits(const IBucketizer & bucketi
void
LogDataStore::verify(bool reportOnly) const
{
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
for (const FileChunk::UP & fc : _fileChunks) {
if (fc) {
fc->verify(reportOnly);
@@ -1022,7 +1021,7 @@ class LogDataStore::WrapVisitor : public IWriteData
IDataStoreVisitor &_visitor;
public:
- void write(LockGuard guard, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz) override {
+ void write(MonitorGuard guard, uint32_t chunkId, uint32_t lid, const void *buffer, size_t sz) override {
(void) chunkId;
guard.unlock();
_visitor.visit(lid, buffer, sz);
@@ -1101,7 +1100,7 @@ LogDataStore::accept(IDataStoreVisitor &visitor,
internalFlushAll();
FileChunk::UP toDie;
{
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
toDie = std::move(_fileChunks[fcId.getId()]);
}
toDie->erase();
@@ -1117,7 +1116,7 @@ double
LogDataStore::getVisitCost() const
{
uint32_t totalChunks = 0;
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
for (auto &fc : _fileChunks) {
totalChunks += fc->getNumChunks();
}
@@ -1146,7 +1145,7 @@ LogDataStore::holdFileChunk(FileId fileId)
void
LogDataStore::unholdFileChunk(FileId fileId)
{
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
assert(fileId.getId() < _holdFileChunks.size());
assert(_holdFileChunks[fileId.getId()] > 0u);
--_holdFileChunks[fileId.getId()];
@@ -1170,7 +1169,7 @@ LogDataStore::getStorageStats() const
vespalib::MemoryUsage
LogDataStore::getMemoryUsage() const
{
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
vespalib::MemoryUsage result;
result.merge(_lidInfo.getMemoryUsage());
for (const auto &fileChunk : _fileChunks) {
@@ -1186,7 +1185,7 @@ LogDataStore::getFileChunkStats() const
{
std::vector<DataStoreFileChunkStats> result;
{
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
for (const FileChunk::UP & fc : _fileChunks) {
if (fc) {
result.push_back(fc->getStats());
@@ -1200,7 +1199,7 @@ LogDataStore::getFileChunkStats() const
void
LogDataStore::compactLidSpace(uint32_t wantedDocLidLimit)
{
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
assert(wantedDocLidLimit <= getDocIdLimit());
for (size_t i = wantedDocLidLimit; i < _lidInfo.size(); ++i) {
_lidInfo[i] = LidInfo();
@@ -1213,12 +1212,12 @@ LogDataStore::compactLidSpace(uint32_t wantedDocLidLimit)
bool
LogDataStore::canShrinkLidSpace() const
{
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
return canShrinkLidSpace(guard);
}
bool
-LogDataStore::canShrinkLidSpace(const LockGuard &) const
+LogDataStore::canShrinkLidSpace(const MonitorGuard &) const
{
return getDocIdLimit() < _lidInfo.size() &&
_compactLidSpaceGeneration < _genHandler.getFirstUsedGeneration();
@@ -1227,7 +1226,7 @@ LogDataStore::canShrinkLidSpace(const LockGuard &) const
size_t
LogDataStore::getEstimatedShrinkLidSpaceGain() const
{
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
if (!canShrinkLidSpace(guard)) {
return 0;
}
@@ -1237,7 +1236,7 @@ LogDataStore::getEstimatedShrinkLidSpaceGain() const
void
LogDataStore::shrinkLidSpace()
{
- LockGuard guard(_updateLock);
+ MonitorGuard guard(_updateLock);
if (!canShrinkLidSpace(guard)) {
return;
}
diff --git a/searchlib/src/vespa/searchlib/docstore/logdatastore.h b/searchlib/src/vespa/searchlib/docstore/logdatastore.h
index 12073c49cde..4d737801c22 100644
--- a/searchlib/src/vespa/searchlib/docstore/logdatastore.h
+++ b/searchlib/src/vespa/searchlib/docstore/logdatastore.h
@@ -33,7 +33,7 @@ private:
using FileId = FileChunk::FileId;
public:
using NameIdSet = std::set<NameId>;
- using LockGuard = std::unique_lock<std::mutex>;
+ using MonitorGuard = std::unique_lock<std::mutex>;
using CompressionConfig = vespalib::compression::CompressionConfig;
class Config {
public:
@@ -119,8 +119,8 @@ public:
const Config & getConfig() const { return _config; }
Config & getConfig() { return _config; }
- void write(LockGuard guard, WriteableFileChunk & destination, uint64_t serialNum, uint32_t lid, const void * buffer, size_t len);
- void write(LockGuard guard, FileId destinationFileId, uint32_t lid, const void * buffer, size_t len);
+ void write(MonitorGuard guard, WriteableFileChunk & destination, uint64_t serialNum, uint32_t lid, const void * buffer, size_t len);
+ void write(MonitorGuard guard, FileId destinationFileId, uint32_t lid, const void * buffer, size_t len);
/**
* This will spinn through the data and verify the content of both
@@ -146,9 +146,9 @@ public:
}
// Implements IGetLid API
- IGetLid::LockGuard getLidGuard(uint32_t lid) const override {
+ IGetLid::unique_lock getLidGuard(uint32_t lid) const override {
(void) lid;
- return IGetLid::LockGuard(_updateLock);
+ return IGetLid::unique_lock(_updateLock);
}
// Implements IGetLid API
@@ -160,12 +160,12 @@ public:
return LidInfo();
}
}
- FileId getActiveFileId(const LockGuard & guard) const {
+ FileId getActiveFileId(const MonitorGuard & guard) const {
assert(hasUpdateLock(guard));
(void) guard;
return _active;
}
- bool hasUpdateLock(const LockGuard & guard) const {
+ bool hasUpdateLock(const MonitorGuard & guard) const {
return (guard.mutex() == &_updateLock) && guard.owns_lock();
}
@@ -188,7 +188,7 @@ private:
class FileChunkHolder;
// Implements ISetLid API
- void setLid(const ISetLid::LockGuard & guard, uint32_t lid, const LidInfo & lm) override;
+ void setLid(const ISetLid::unique_lock & guard, uint32_t lid, const LidInfo & lm) override;
void compactWorst(double bloatLimit, double spreadLimit, bool prioritizeDiskBloat);
void compactFile(FileId chunkId);
@@ -209,25 +209,25 @@ private:
bool isTotalDiskBloatExceeded(size_t diskFootPrint, size_t bloat) const;
NameIdSet scanDir(const vespalib::string &dir, const vespalib::string &suffix);
- FileId allocateFileId(const LockGuard & guard);
- void setNewFileChunk(const LockGuard & guard, FileChunk::UP fileChunk);
+ FileId allocateFileId(const MonitorGuard & guard);
+ void setNewFileChunk(const MonitorGuard & guard, FileChunk::UP fileChunk);
vespalib::string ls(const NameIdSet & partList);
- WriteableFileChunk & getActive(const LockGuard & guard) {
+ WriteableFileChunk & getActive(const MonitorGuard & guard) {
assert(hasUpdateLock(guard));
return static_cast<WriteableFileChunk &>(*_fileChunks[_active.getId()]);
}
- const WriteableFileChunk & getActive(const LockGuard & guard) const {
+ const WriteableFileChunk & getActive(const MonitorGuard & guard) const {
assert(hasUpdateLock(guard));
return static_cast<const WriteableFileChunk &>(*_fileChunks[_active.getId()]);
}
- const FileChunk * getPrevActive(const LockGuard & guard) const {
+ const FileChunk * getPrevActive(const MonitorGuard & guard) const {
assert(hasUpdateLock(guard));
return ( !_prevActive.isActive() ) ? _fileChunks[_prevActive.getId()].get() : nullptr;
}
- void setActive(const LockGuard & guard, FileId fileId) {
+ void setActive(const MonitorGuard & guard, FileId fileId) {
assert(hasUpdateLock(guard));
_prevActive = _active;
_active = fileId;
@@ -242,7 +242,7 @@ private:
vespalib::string createDatFileName(NameId id) const;
vespalib::string createIdxFileName(NameId id) const;
- void requireSpace(LockGuard guard, WriteableFileChunk & active);
+ void requireSpace(MonitorGuard guard, WriteableFileChunk & active);
bool isReadOnly() const { return _readOnly; }
void updateSerialNum();
@@ -259,17 +259,17 @@ private:
*/
void unholdFileChunk(FileId fileId);
- SerialNum flushFile(LockGuard guard, WriteableFileChunk & file, SerialNum syncToken);
+ SerialNum flushFile(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken);
SerialNum flushActive(SerialNum syncToken);
void flushActiveAndWait(SerialNum syncToken);
- void flushFileAndWait(LockGuard guard, WriteableFileChunk & file, SerialNum syncToken);
+ void flushFileAndWait(MonitorGuard guard, WriteableFileChunk & file, SerialNum syncToken);
SerialNum getMinLastPersistedSerialNum() const {
return (_fileChunks.empty() ? 0 : _fileChunks.back()->getLastPersistedSerialNum());
}
bool shouldCompactToActiveFile(size_t compactedSize) const;
std::pair<bool, FileId> findNextToCompact(double bloatLimit, double spreadLimit, bool prioritizeDiskBloat);
void incGeneration();
- bool canShrinkLidSpace(const LockGuard &guard) const;
+ bool canShrinkLidSpace(const MonitorGuard &guard) const;
typedef std::vector<FileId> FileIdxVector;
Config _config;
diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp
index d7711b61d78..6418305e01f 100644
--- a/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.cpp
@@ -18,7 +18,8 @@ StoreByBucket::StoreByBucket(MemoryDataStore & backingMemory, Executor & executo
_where(),
_backingMemory(backingMemory),
_executor(executor),
- _monitor(),
+ _lock(std::make_unique<std::mutex>()),
+ _cond(std::make_unique<std::condition_variable>()),
_numChunksPosted(0),
_chunks(),
_compression(compression)
@@ -50,7 +51,7 @@ StoreByBucket::createChunk()
size_t
StoreByBucket::getChunkCount() const {
- vespalib::LockGuard guard(_monitor);
+ std::lock_guard guard(*_lock);
return _chunks.size();
}
@@ -61,24 +62,24 @@ StoreByBucket::closeChunk(Chunk::UP chunk)
chunk->pack(1, buffer, _compression);
buffer.shrink(buffer.getDataLen());
ConstBufferRef bufferRef(_backingMemory.push_back(buffer.getData(), buffer.getDataLen()).data(), buffer.getDataLen());
- vespalib::MonitorGuard guard(_monitor);
+ std::lock_guard guard(*_lock);
_chunks[chunk->getId()] = bufferRef;
if (_numChunksPosted == _chunks.size()) {
- guard.signal();
+ _cond->notify_one();
}
}
void
StoreByBucket::incChunksPosted() {
- vespalib::MonitorGuard guard(_monitor);
+ std::lock_guard guard(*_lock);
_numChunksPosted++;
}
void
StoreByBucket::waitAllProcessed() {
- vespalib::MonitorGuard guard(_monitor);
+ std::unique_lock guard(*_lock);
while (_numChunksPosted != _chunks.size()) {
- guard.wait();
+ _cond->wait(guard);
}
}
diff --git a/searchlib/src/vespa/searchlib/docstore/storebybucket.h b/searchlib/src/vespa/searchlib/docstore/storebybucket.h
index 1365dcb4416..0716a7c3ffc 100644
--- a/searchlib/src/vespa/searchlib/docstore/storebybucket.h
+++ b/searchlib/src/vespa/searchlib/docstore/storebybucket.h
@@ -6,9 +6,9 @@
#include <vespa/document/bucket/bucketid.h>
#include <vespa/vespalib/data/memorydatastore.h>
#include <vespa/vespalib/util/executor.h>
-#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/stllike/hash_map.h>
#include <map>
+#include <condition_variable>
namespace search::docstore {
@@ -72,7 +72,8 @@ private:
std::map<uint64_t, IndexVector> _where;
MemoryDataStore & _backingMemory;
Executor & _executor;
- vespalib::Monitor _monitor;
+ std::unique_ptr<std::mutex> _lock;
+ std::unique_ptr<std::condition_variable> _cond;
size_t _numChunksPosted;
vespalib::hash_map<uint64_t, ConstBufferRef> _chunks;
CompressionConfig _compression;
diff --git a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp
index cdf6220dfbc..8f03bf9e791 100644
--- a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.cpp
@@ -17,8 +17,6 @@ LOG_SETUP(".search.writeablefilechunk");
using vespalib::makeLambdaTask;
using vespalib::FileHeader;
using vespalib::make_string;
-using vespalib::LockGuard;
-using vespalib::MonitorGuard;
using vespalib::nbostream;
using vespalib::IllegalHeaderException;
using vespalib::GenerationHandler;
@@ -89,6 +87,7 @@ WriteableFileChunk(vespalib::Executor &executor,
_serialNum(initialSerialNum),
_frozen(false),
_lock(),
+ _cond(),
_writeLock(),
_flushLock(),
_dataFile(_dataFileName.c_str()),
@@ -105,6 +104,8 @@ WriteableFileChunk(vespalib::Executor &executor,
_maxChunkSize(0x100000),
_firstChunkIdToBeWritten(0),
_writeTaskIsRunning(false),
+ _writeMonitor(),
+ _writeCond(),
_executor(executor),
_bucketMap(bucketizer)
{
@@ -171,7 +172,7 @@ WriteableFileChunk::~WriteableFileChunk()
}
size_t
-WriteableFileChunk::updateLidMap(const LockGuard &guard, ISetLid &ds, uint64_t serialNum, uint32_t docIdLimit)
+WriteableFileChunk::updateLidMap(const unique_lock &guard, ISetLid &ds, uint64_t serialNum, uint32_t docIdLimit)
{
size_t sz = FileChunk::updateLidMap(guard, ds, serialNum, docIdLimit);
_nextChunkId = _chunkInfo.size();
@@ -220,7 +221,7 @@ WriteableFileChunk::read(LidInfoWithLidV::const_iterator begin, size_t count, IB
vespalib::hash_map<uint32_t, ChunkInfo> chunksOnFile;
std::vector<LidAndBuffer> buffers;
{
- vespalib::LockGuard guard(_lock);
+ std::lock_guard guard(_lock);
for (size_t i(0); i < count; i++) {
const LidInfoWithLid & li = *(begin + i);
uint32_t chunk = li.getChunkId();
@@ -260,7 +261,7 @@ WriteableFileChunk::read(uint32_t lid, SubChunkId chunkId, vespalib::DataBuffer
{
ChunkInfo chunkInfo;
if (!frozen()) {
- vespalib::LockGuard guard(_lock);
+ std::lock_guard guard(_lock);
if ((chunkId >= _chunkInfo.size()) || !_chunkInfo[chunkId].valid()) {
auto found = _chunkMap.find(chunkId);
if (found != _chunkMap.end()) {
@@ -282,7 +283,7 @@ WriteableFileChunk::internalFlush(uint32_t chunkId, uint64_t serialNum)
{
Chunk * active(nullptr);
{
- vespalib::LockGuard guard(_lock);
+ std::lock_guard guard(_lock);
active = _chunkMap[chunkId].get();
}
@@ -298,7 +299,7 @@ WriteableFileChunk::internalFlush(uint32_t chunkId, uint64_t serialNum)
tmp->getBuf().moveFreeToData(padAfter);
}
{
- vespalib::LockGuard innerGuard(_lock);
+ std::lock_guard innerGuard(_lock);
setDiskFootprint(FileChunk::getDiskFootprint() + tmp->getBuf().getDataLen());
}
enque(std::move(tmp));
@@ -308,16 +309,16 @@ void
WriteableFileChunk::enque(ProcessedChunkUP tmp)
{
LOG(debug, "enqueing %p", tmp.get());
- MonitorGuard guard(_writeMonitor);
+ std::unique_lock guard(_writeMonitor);
_writeQ.push_back(std::move(tmp));
if ( ! _writeTaskIsRunning) {
_writeTaskIsRunning = true;
uint32_t nextChunkId = _firstChunkIdToBeWritten;
- guard.signal();
guard.unlock();
+ _writeCond.notify_one();
restart(nextChunkId);
} else {
- guard.signal();
+ _writeCond.notify_one();
}
}
@@ -356,12 +357,13 @@ getAlignedStartPos(FastOS_File & file)
}
WriteableFileChunk::ProcessedChunkQ
-WriteableFileChunk::drainQ(MonitorGuard & guard)
+WriteableFileChunk::drainQ(unique_lock & guard)
{
+ assert(guard.mutex() == &_writeMonitor && guard.owns_lock());
ProcessedChunkQ newChunks;
newChunks.swap(_writeQ);
if ( ! newChunks.empty() ) {
- guard.broadcast();
+ _writeCond.notify_one();
}
return newChunks;
}
@@ -396,11 +398,11 @@ WriteableFileChunk::fetchNextChain(ProcessedChunkMap & orderedChunks, const uint
}
ChunkMeta
-WriteableFileChunk::computeChunkMeta(const vespalib::LockGuard & guard,
+WriteableFileChunk::computeChunkMeta(const unique_lock & guard,
const GenerationHandler::Guard & bucketizerGuard,
size_t offset, const ProcessedChunk & tmp, const Chunk & active)
{
- assert(guard.locks(_lock));
+ assert((guard.mutex() == &_lock) && guard.owns_lock());
size_t dataLen = tmp.getBuf().getDataLen();
const ChunkMeta cmeta(offset, tmp.getPayLoad(), active.getLastSerial(), active.count());
assert((size_t(tmp.getBuf().getData())%_alignment) == 0);
@@ -431,8 +433,7 @@ WriteableFileChunk::computeChunkMeta(ProcessedChunkQ & chunks, size_t startPos,
ChunkMetaV cmetaV;
cmetaV.reserve(chunks.size());
uint64_t lastSerial(_lastPersistedSerialNum);
- (void) lastSerial;
- vespalib::LockGuard guard(_lock);
+ std::unique_lock guard(_lock);
if (!_pendingChunks.empty()) {
const PendingChunk & pc = *_pendingChunks.back();
@@ -468,7 +469,7 @@ WriteableFileChunk::writeData(const ProcessedChunkQ & chunks, size_t sz)
buf.writeBytes(chunk->getBuf().getData(), chunk->getBuf().getDataLen());
}
- LockGuard guard(_writeLock);
+ std::lock_guard guard(_writeLock);
ssize_t wlen = _dataFile.Write2(buf.getData(), buf.getDataLen());
if (wlen != static_cast<ssize_t>(buf.getDataLen())) {
throw SummaryException(make_string("Failed writing %ld bytes to dat file. Only %ld written",
@@ -481,7 +482,7 @@ WriteableFileChunk::writeData(const ProcessedChunkQ & chunks, size_t sz)
void
WriteableFileChunk::updateChunkInfo(const ProcessedChunkQ & chunks, const ChunkMetaV & cmetaV, size_t sz)
{
- MonitorGuard guard(_lock);
+ std::lock_guard guard(_lock);
size_t nettoSz(sz);
for (size_t i(0); i < chunks.size(); i++) {
const ProcessedChunk & chunk = *chunks[i];
@@ -496,7 +497,7 @@ WriteableFileChunk::updateChunkInfo(const ProcessedChunkQ & chunks, const ChunkM
_chunkMap.erase(_chunkMap.begin());
}
setDiskFootprint(FileChunk::getDiskFootprint() - nettoSz);
- guard.broadcast();
+ _cond.notify_all();
}
void
@@ -505,7 +506,7 @@ WriteableFileChunk::fileWriter(const uint32_t firstChunkId)
LOG(debug, "Starting the filewriter with chunkid = %d", firstChunkId);
uint32_t nextChunkId(firstChunkId);
bool done(false);
- MonitorGuard guard(_writeMonitor);
+ std::unique_lock guard(_writeMonitor);
{
for (ProcessedChunkQ newChunks(drainQ(guard)); !newChunks.empty(); newChunks = drainQ(guard)) {
guard.unlock();
@@ -518,7 +519,7 @@ WriteableFileChunk::fileWriter(const uint32_t firstChunkId)
writeData(chunks, sz);
updateChunkInfo(chunks, cmetaV, sz);
LOG(spam, "bucket spread = '%3.2f'", getBucketSpread());
- guard = MonitorGuard(_writeMonitor);
+ guard = std::unique_lock(_writeMonitor);
if (done) break;
}
}
@@ -532,7 +533,7 @@ WriteableFileChunk::fileWriter(const uint32_t firstChunkId)
(void) cm;
assert(cm.valid() && cm.getSize() != 0);
}
- guard.broadcast();
+ _writeCond.notify_all();
} else {
_firstChunkIdToBeWritten = nextChunkId;
}
@@ -541,7 +542,7 @@ WriteableFileChunk::fileWriter(const uint32_t firstChunkId)
vespalib::system_time
WriteableFileChunk::getModificationTime() const
{
- vespalib::LockGuard guard(_lock);
+ std::lock_guard guard(_lock);
return _modificationTime;
}
@@ -552,15 +553,15 @@ WriteableFileChunk::freeze()
waitForAllChunksFlushedToDisk();
enque(ProcessedChunkUP());
{
- MonitorGuard guard(_writeMonitor);
+ std::unique_lock guard(_writeMonitor);
while (_writeTaskIsRunning) {
- guard.wait(10);
+ _writeCond.wait_for(guard, 10ms);
}
}
assert(_writeQ.empty());
assert(_chunkMap.empty());
{
- MonitorGuard guard(_lock);
+ std::unique_lock guard(_lock);
setDiskFootprint(getDiskFootprint(guard));
_frozen = true;
}
@@ -576,16 +577,15 @@ WriteableFileChunk::getDiskFootprint() const
return FileChunk::getDiskFootprint();
} else {
// Double checked locking.
- MonitorGuard guard(_lock);
+ std::unique_lock guard(_lock);
return getDiskFootprint(guard);
}
}
size_t
-WriteableFileChunk::getDiskFootprint(const vespalib::MonitorGuard & guard) const
+WriteableFileChunk::getDiskFootprint(const unique_lock & guard) const
{
- (void) guard;
- assert(guard.monitors(_lock));
+ assert(guard.mutex() == &_lock && guard.owns_lock());
return frozen()
? FileChunk::getDiskFootprint()
: _currentDiskFootprint + FileChunk::getDiskFootprint();
@@ -595,7 +595,7 @@ size_t
WriteableFileChunk::getMemoryFootprint() const
{
size_t sz(0);
- vespalib::LockGuard guard(_lock);
+ std::lock_guard guard(_lock);
for (const auto & it : _chunkMap) {
sz += it.second->size();
}
@@ -613,7 +613,7 @@ WriteableFileChunk::getMemoryMetaFootprint() const
vespalib::MemoryUsage
WriteableFileChunk::getMemoryUsage() const
{
- vespalib::LockGuard guard(_lock);
+ std::lock_guard guard(_lock);
vespalib::MemoryUsage result;
for (const auto &chunk : _chunkMap) {
result.merge(chunk.second->getMemoryUsage());
@@ -628,11 +628,11 @@ WriteableFileChunk::getMemoryUsage() const
int32_t WriteableFileChunk::flushLastIfNonEmpty(bool force)
{
int32_t chunkId(-1);
- MonitorGuard guard(_lock);
+ std::unique_lock guard(_lock);
for (bool ready(false); !ready;) {
if (_chunkMap.size() > 1000) {
LOG(debug, "Summary write overload at least 1000 outstanding chunks. Suspending.");
- guard.wait();
+ _cond.wait(guard);
LOG(debug, "Summary write overload eased off. Commencing.");
} else {
ready = true;
@@ -656,7 +656,7 @@ WriteableFileChunk::flush(bool block, uint64_t syncToken)
_executor.execute(makeLambdaTask([this, chunkId, serialNum=_serialNum] { internalFlush(chunkId, serialNum); }));
} else {
if (block) {
- MonitorGuard guard(_lock);
+ std::lock_guard guard(_lock);
if (!_chunkMap.empty()) {
chunkId = _chunkMap.rbegin()->first;
}
@@ -672,7 +672,7 @@ WriteableFileChunk::waitForDiskToCatchUpToNow() const
{
int32_t chunkId(-1);
{
- MonitorGuard guard(_lock);
+ std::lock_guard guard(_lock);
if (!_chunkMap.empty()) {
chunkId = _chunkMap.rbegin()->first;
}
@@ -683,18 +683,18 @@ WriteableFileChunk::waitForDiskToCatchUpToNow() const
void
WriteableFileChunk::waitForChunkFlushedToDisk(uint32_t chunkId) const
{
- MonitorGuard guard(_lock);
+ std::unique_lock guard(_lock);
while( _chunkMap.find(chunkId) != _chunkMap.end() ) {
- guard.wait();
+ _cond.wait(guard);
}
}
void
WriteableFileChunk::waitForAllChunksFlushedToDisk() const
{
- MonitorGuard guard(_lock);
+ std::unique_lock guard(_lock);
while( ! _chunkMap.empty() ) {
- guard.wait();
+ _cond.wait(guard);
}
}
@@ -811,15 +811,15 @@ WriteableFileChunk::writeIdxHeader(const FileHeaderContext &fileHeaderContext, u
bool
WriteableFileChunk::needFlushPendingChunks(uint64_t serialNum, uint64_t datFileLen) {
- MonitorGuard guard(_lock);
+ std::unique_lock guard(_lock);
return needFlushPendingChunks(guard, serialNum, datFileLen);
}
bool
-WriteableFileChunk::needFlushPendingChunks(const MonitorGuard & guard, uint64_t serialNum, uint64_t datFileLen)
+WriteableFileChunk::needFlushPendingChunks(const unique_lock & guard, uint64_t serialNum, uint64_t datFileLen)
{
(void) guard;
- assert(guard.monitors(_lock));
+ assert(guard.mutex() == &_lock && guard.owns_lock());
if (_pendingChunks.empty())
return false;
const PendingChunk & pc = *_pendingChunks.front();
@@ -851,12 +851,12 @@ WriteableFileChunk::flushPendingChunks(uint64_t serialNum) {
if (needFlushPendingChunks(serialNum, datFileLen)) {
timeStamp = unconditionallyFlushPendingChunks(flushGuard, serialNum, datFileLen);
}
- vespalib::LockGuard guard(_lock);
+ std::lock_guard guard(_lock);
_modificationTime = std::max(timeStamp, _modificationTime);
}
vespalib::system_time
-WriteableFileChunk::unconditionallyFlushPendingChunks(const LockGuard &flushGuard, uint64_t serialNum, uint64_t datFileLen)
+WriteableFileChunk::unconditionallyFlushPendingChunks(const unique_lock &flushGuard, uint64_t serialNum, uint64_t datFileLen)
{
assert((flushGuard.mutex() == &_flushLock) && flushGuard.owns_lock());
if ( ! _dataFile.Sync()) {
@@ -865,7 +865,7 @@ WriteableFileChunk::unconditionallyFlushPendingChunks(const LockGuard &flushGuar
nbostream os;
uint64_t lastSerial = 0;
{
- MonitorGuard guard(_lock);
+ std::unique_lock guard(_lock);
lastSerial = _lastPersistedSerialNum;
for (;;) {
if (!needFlushPendingChunks(guard, serialNum, datFileLen))
diff --git a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h
index cdbbd51ec77..901e70269ec 100644
--- a/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h
+++ b/searchlib/src/vespa/searchlib/docstore/writeablefilechunk.h
@@ -4,11 +4,11 @@
#include "filechunk.h"
#include <vespa/vespalib/util/executor.h>
-#include <vespa/vespalib/util/sync.h>
#include <vespa/searchlib/transactionlog/syncproxy.h>
#include <vespa/fastos/file.h>
#include <map>
#include <deque>
+#include <condition_variable>
namespace search {
@@ -64,7 +64,7 @@ public:
size_t getMemoryFootprint() const override;
size_t getMemoryMetaFootprint() const override;
vespalib::MemoryUsage getMemoryUsage() const override;
- size_t updateLidMap(const LockGuard &guard, ISetLid &lidMap, uint64_t serialNum, uint32_t docIdLimit) override;
+ size_t updateLidMap(const unique_lock &guard, ISetLid &lidMap, uint64_t serialNum, uint32_t docIdLimit) override;
void waitForDiskToCatchUpToNow() const;
void flushPendingChunks(uint64_t serialNum);
DataStoreFileChunkStats getStats() const override;
@@ -85,30 +85,31 @@ private:
int32_t flushLastIfNonEmpty(bool force);
// _writeMonitor should not be held when calling restart
void restart(uint32_t nextChunkId);
- ProcessedChunkQ drainQ(vespalib::MonitorGuard & guard);
+ ProcessedChunkQ drainQ(unique_lock & guard);
void readDataHeader();
void readIdxHeader(FastOS_FileInterface & idxFile);
void writeDataHeader(const common::FileHeaderContext &fileHeaderContext);
bool needFlushPendingChunks(uint64_t serialNum, uint64_t datFileLen);
- bool needFlushPendingChunks(const vespalib::MonitorGuard & guard, uint64_t serialNum, uint64_t datFileLen);
- vespalib::system_time unconditionallyFlushPendingChunks(const LockGuard & flushGuard, uint64_t serialNum, uint64_t datFileLen);
- static void insertChunks(ProcessedChunkMap & orderedChunks, ProcessedChunkQ & newChunks, const uint32_t nextChunkId);
- static ProcessedChunkQ fetchNextChain(ProcessedChunkMap & orderedChunks, const uint32_t firstChunkId);
- ChunkMeta computeChunkMeta(const vespalib::LockGuard & guard,
+ bool needFlushPendingChunks(const unique_lock & guard, uint64_t serialNum, uint64_t datFileLen);
+ vespalib::system_time unconditionallyFlushPendingChunks(const unique_lock & flushGuard, uint64_t serialNum, uint64_t datFileLen);
+ static void insertChunks(ProcessedChunkMap & orderedChunks, ProcessedChunkQ & newChunks, uint32_t nextChunkId);
+ static ProcessedChunkQ fetchNextChain(ProcessedChunkMap & orderedChunks, uint32_t firstChunkId);
+ ChunkMeta computeChunkMeta(const unique_lock & guard,
const vespalib::GenerationHandler::Guard & bucketizerGuard,
size_t offset, const ProcessedChunk & tmp, const Chunk & active);
ChunkMetaV computeChunkMeta(ProcessedChunkQ & chunks, size_t startPos, size_t & sz, bool & done);
void writeData(const ProcessedChunkQ & chunks, size_t sz);
void updateChunkInfo(const ProcessedChunkQ & chunks, const ChunkMetaV & cmetaV, size_t sz);
void updateCurrentDiskFootprint();
- size_t getDiskFootprint(const vespalib::MonitorGuard & guard) const;
+ size_t getDiskFootprint(const unique_lock & guard) const;
std::unique_ptr<FastOS_FileInterface> openIdx();
Config _config;
SerialNum _serialNum;
bool _frozen;
// Lock order is _writeLock, _flushLock, _lock
- vespalib::Monitor _lock;
+ mutable std::mutex _lock;
+ mutable std::condition_variable _cond;
std::mutex _writeLock;
std::mutex _flushLock;
FastOS_File _dataFile;
@@ -127,7 +128,8 @@ private:
size_t _maxChunkSize;
uint32_t _firstChunkIdToBeWritten;
bool _writeTaskIsRunning;
- vespalib::Monitor _writeMonitor;
+ std::mutex _writeMonitor;
+ std::condition_variable _writeCond;
ProcessedChunkQ _writeQ;
vespalib::Executor & _executor;
ProcessedChunkMap _orderedChunks;