diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-14 14:25:32 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-14 14:25:32 +0000 |
commit | dece3a04316d628e6e9d2bda3e03f241db32cba9 (patch) | |
tree | 15cb36503a3fc377392f6cd4b68afd79ef9f881d /searchlib | |
parent | 5ab1a8b97842b3a87fc0d3fec5cb631b1d356ab9 (diff) |
vespalib::Monitor -> std:.mutex/std::condition_variable
Diffstat (limited to 'searchlib')
3 files changed, 30 insertions, 28 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 454293dfc84..9efc68bc8ec 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -113,8 +113,8 @@ Domain::addPart(SerialNum partId, bool isLastPart) { } Domain::~Domain() { - MonitorGuard guard(_currentChunkMonitor); - guard.broadcast(); + std::unique_lock guard(_currentChunkMonitor); + _currentChunkCond.notify_all(); commitChunk(grabCurrentChunk(guard), guard); _singleCommitter->shutdown().sync(); } @@ -122,7 +122,7 @@ Domain::~Domain() { DomainInfo Domain::getDomainInfo() const { - UniqueLock guard(_lock); + std::unique_lock guard(_lock); DomainInfo info(SerialNumRange(begin(guard), end(guard)), size(guard), byteSize(guard), _maxSessionRunTime); for (const auto &entry: _parts) { const DomainPart &part = *entry.second; @@ -211,17 +211,17 @@ void Domain::triggerSyncNow() { { - vespalib::MonitorGuard guard(_currentChunkMonitor); + std::unique_lock guard(_currentChunkMonitor); commitAndTransferResponses(guard); } - MonitorGuard guard(_syncMonitor); + std::unique_lock guard(_syncMonitor); if (!_pendingSync) { _pendingSync = true; _executor.execute(makeLambdaTask([this, domainPart=_parts.rbegin()->second]() { domainPart->sync(); - MonitorGuard monitorGuard(_syncMonitor); + std::lock_guard monitorGuard(_syncMonitor); _pendingSync = false; - monitorGuard.broadcast(); + _syncCond.notify_all(); })); } } @@ -297,11 +297,11 @@ Domain::cleanSessions() namespace { void -waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync) +waitPendingSync(std::mutex &syncMonitor, std::condition_variable & syncCond, bool &pendingSync) { - MonitorGuard guard(syncMonitor); + std::unique_lock guard(syncMonitor); while (pendingSync) { - guard.wait(); + syncCond.wait(guard); } } @@ -311,9 +311,9 @@ DomainPart::SP Domain::optionallyRotateFile(SerialNum serialNum) { DomainPart::SP dp(_parts.rbegin()->second); if (dp->byteSize() > _config.getPartSizeLimit()) { - waitPendingSync(_syncMonitor, _pendingSync); + waitPendingSync(_syncMonitor, _syncCond, _pendingSync); triggerSyncNow(); - waitPendingSync(_syncMonitor, _pendingSync); + waitPendingSync(_syncMonitor, _syncCond, _pendingSync); dp->close(); dp = std::make_shared<DomainPart>(_name, dir(), serialNum, _config.getEncoding(), _config.getCompressionlevel(), _fileHeaderContext, false); @@ -329,7 +329,7 @@ Domain::optionallyRotateFile(SerialNum serialNum) { void Domain::append(const Packet & packet, Writer::DoneCallback onDone) { - vespalib::MonitorGuard guard(_currentChunkMonitor); + std::unique_lock guard(_currentChunkMonitor); if (_lastSerial >= packet.range().from()) { throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").", packet.range().from(), _lastSerial)); @@ -342,7 +342,7 @@ Domain::append(const Packet & packet, Writer::DoneCallback onDone) { Domain::CommitResult Domain::startCommit(DoneCallback onDone) { - vespalib::MonitorGuard guard(_currentChunkMonitor); + std::unique_lock guard(_currentChunkMonitor); if ( !_currentChunk->empty() ) { auto completed = grabCurrentChunk(guard); completed->setCommitDoneCallback(std::move(onDone)); @@ -354,30 +354,30 @@ Domain::startCommit(DoneCallback onDone) { } void -Domain::commitIfFull(const vespalib::MonitorGuard &guard) { +Domain::commitIfFull(const UniqueLock &guard) { if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) { commitAndTransferResponses(guard); } } void -Domain::commitAndTransferResponses(const vespalib::MonitorGuard &guard) { +Domain::commitAndTransferResponses(const UniqueLock &guard) { auto completed = std::move(_currentChunk); _currentChunk = std::make_unique<CommitChunk>(_config.getChunkSizeLimit(), completed->stealCallbacks()); commitChunk(std::move(completed), guard); } std::unique_ptr<CommitChunk> -Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) { - assert(guard.monitors(_currentChunkMonitor)); +Domain::grabCurrentChunk(const UniqueLock & guard) { + assert(guard.mutex() == &_currentChunkMonitor && guard.owns_lock()); auto chunk = std::move(_currentChunk); _currentChunk = createCommitChunk(_config); return chunk; } void -Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) { - assert(chunkOrderGuard.monitors(_currentChunkMonitor)); +Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const UniqueLock & chunkOrderGuard) { + assert(chunkOrderGuard.mutex() == &_currentChunkMonitor && chunkOrderGuard.owns_lock()); _singleCommitter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable { doCommit(std::move(chunk)); })); diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index a883156f32c..432787f8619 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -2,9 +2,10 @@ #pragma once #include "domainconfig.h" -#include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/threadexecutor.h> #include <atomic> +#include <mutex> +#include <condition_variable> namespace search::common { class FileHeaderContext; } namespace search::transactionlog { @@ -58,11 +59,11 @@ public: private: using UniqueLock = std::unique_lock<std::mutex>; void verifyLock(const UniqueLock & guard) const; - void commitIfFull(const vespalib::MonitorGuard & guard); - void commitAndTransferResponses(const vespalib::MonitorGuard & guard); + void commitIfFull(const UniqueLock & guard); + void commitAndTransferResponses(const UniqueLock & guard); - std::unique_ptr<CommitChunk> grabCurrentChunk(const vespalib::MonitorGuard & guard); - void commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard); + std::unique_ptr<CommitChunk> grabCurrentChunk(const UniqueLock & guard); + void commitChunk(std::unique_ptr<CommitChunk> chunk, const UniqueLock & chunkOrderGuard); void doCommit(std::unique_ptr<CommitChunk> chunk); SerialNum begin(const UniqueLock & guard) const; SerialNum end(const UniqueLock & guard) const; @@ -87,12 +88,14 @@ private: std::unique_ptr<Executor> _singleCommitter; Executor &_executor; std::atomic<int> _sessionId; - vespalib::Monitor _syncMonitor; + std::mutex _syncMonitor; + std::condition_variable _syncCond; bool _pendingSync; vespalib::string _name; DomainPartList _parts; mutable std::mutex _lock; - vespalib::Monitor _currentChunkMonitor; + std::mutex _currentChunkMonitor; + std::condition_variable _currentChunkCond; mutable std::mutex _sessionLock; SessionList _sessions; DurationSeconds _maxSessionRunTime; diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp index 3aedeb11121..edfd7367a1f 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp @@ -8,7 +8,6 @@ #include <vespa/log/log.h> LOG_SETUP(".transactionlog.session"); -using vespalib::LockGuard; namespace search::transactionlog { |