summaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-14 14:25:32 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-10-14 14:25:32 +0000
commitdece3a04316d628e6e9d2bda3e03f241db32cba9 (patch)
tree15cb36503a3fc377392f6cd4b68afd79ef9f881d /searchlib
parent5ab1a8b97842b3a87fc0d3fec5cb631b1d356ab9 (diff)
vespalib::Monitor -> std:.mutex/std::condition_variable
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp40
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h17
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/session.cpp1
3 files changed, 30 insertions, 28 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index 454293dfc84..9efc68bc8ec 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -113,8 +113,8 @@ Domain::addPart(SerialNum partId, bool isLastPart) {
}
Domain::~Domain() {
- MonitorGuard guard(_currentChunkMonitor);
- guard.broadcast();
+ std::unique_lock guard(_currentChunkMonitor);
+ _currentChunkCond.notify_all();
commitChunk(grabCurrentChunk(guard), guard);
_singleCommitter->shutdown().sync();
}
@@ -122,7 +122,7 @@ Domain::~Domain() {
DomainInfo
Domain::getDomainInfo() const
{
- UniqueLock guard(_lock);
+ std::unique_lock guard(_lock);
DomainInfo info(SerialNumRange(begin(guard), end(guard)), size(guard), byteSize(guard), _maxSessionRunTime);
for (const auto &entry: _parts) {
const DomainPart &part = *entry.second;
@@ -211,17 +211,17 @@ void
Domain::triggerSyncNow()
{
{
- vespalib::MonitorGuard guard(_currentChunkMonitor);
+ std::unique_lock guard(_currentChunkMonitor);
commitAndTransferResponses(guard);
}
- MonitorGuard guard(_syncMonitor);
+ std::unique_lock guard(_syncMonitor);
if (!_pendingSync) {
_pendingSync = true;
_executor.execute(makeLambdaTask([this, domainPart=_parts.rbegin()->second]() {
domainPart->sync();
- MonitorGuard monitorGuard(_syncMonitor);
+ std::lock_guard monitorGuard(_syncMonitor);
_pendingSync = false;
- monitorGuard.broadcast();
+ _syncCond.notify_all();
}));
}
}
@@ -297,11 +297,11 @@ Domain::cleanSessions()
namespace {
void
-waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync)
+waitPendingSync(std::mutex &syncMonitor, std::condition_variable & syncCond, bool &pendingSync)
{
- MonitorGuard guard(syncMonitor);
+ std::unique_lock guard(syncMonitor);
while (pendingSync) {
- guard.wait();
+ syncCond.wait(guard);
}
}
@@ -311,9 +311,9 @@ DomainPart::SP
Domain::optionallyRotateFile(SerialNum serialNum) {
DomainPart::SP dp(_parts.rbegin()->second);
if (dp->byteSize() > _config.getPartSizeLimit()) {
- waitPendingSync(_syncMonitor, _pendingSync);
+ waitPendingSync(_syncMonitor, _syncCond, _pendingSync);
triggerSyncNow();
- waitPendingSync(_syncMonitor, _pendingSync);
+ waitPendingSync(_syncMonitor, _syncCond, _pendingSync);
dp->close();
dp = std::make_shared<DomainPart>(_name, dir(), serialNum, _config.getEncoding(),
_config.getCompressionlevel(), _fileHeaderContext, false);
@@ -329,7 +329,7 @@ Domain::optionallyRotateFile(SerialNum serialNum) {
void
Domain::append(const Packet & packet, Writer::DoneCallback onDone) {
- vespalib::MonitorGuard guard(_currentChunkMonitor);
+ std::unique_lock guard(_currentChunkMonitor);
if (_lastSerial >= packet.range().from()) {
throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").",
packet.range().from(), _lastSerial));
@@ -342,7 +342,7 @@ Domain::append(const Packet & packet, Writer::DoneCallback onDone) {
Domain::CommitResult
Domain::startCommit(DoneCallback onDone) {
- vespalib::MonitorGuard guard(_currentChunkMonitor);
+ std::unique_lock guard(_currentChunkMonitor);
if ( !_currentChunk->empty() ) {
auto completed = grabCurrentChunk(guard);
completed->setCommitDoneCallback(std::move(onDone));
@@ -354,30 +354,30 @@ Domain::startCommit(DoneCallback onDone) {
}
void
-Domain::commitIfFull(const vespalib::MonitorGuard &guard) {
+Domain::commitIfFull(const UniqueLock &guard) {
if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) {
commitAndTransferResponses(guard);
}
}
void
-Domain::commitAndTransferResponses(const vespalib::MonitorGuard &guard) {
+Domain::commitAndTransferResponses(const UniqueLock &guard) {
auto completed = std::move(_currentChunk);
_currentChunk = std::make_unique<CommitChunk>(_config.getChunkSizeLimit(), completed->stealCallbacks());
commitChunk(std::move(completed), guard);
}
std::unique_ptr<CommitChunk>
-Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) {
- assert(guard.monitors(_currentChunkMonitor));
+Domain::grabCurrentChunk(const UniqueLock & guard) {
+ assert(guard.mutex() == &_currentChunkMonitor && guard.owns_lock());
auto chunk = std::move(_currentChunk);
_currentChunk = createCommitChunk(_config);
return chunk;
}
void
-Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) {
- assert(chunkOrderGuard.monitors(_currentChunkMonitor));
+Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const UniqueLock & chunkOrderGuard) {
+ assert(chunkOrderGuard.mutex() == &_currentChunkMonitor && chunkOrderGuard.owns_lock());
_singleCommitter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable {
doCommit(std::move(chunk));
}));
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index a883156f32c..432787f8619 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -2,9 +2,10 @@
#pragma once
#include "domainconfig.h"
-#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/util/threadexecutor.h>
#include <atomic>
+#include <mutex>
+#include <condition_variable>
namespace search::common { class FileHeaderContext; }
namespace search::transactionlog {
@@ -58,11 +59,11 @@ public:
private:
using UniqueLock = std::unique_lock<std::mutex>;
void verifyLock(const UniqueLock & guard) const;
- void commitIfFull(const vespalib::MonitorGuard & guard);
- void commitAndTransferResponses(const vespalib::MonitorGuard & guard);
+ void commitIfFull(const UniqueLock & guard);
+ void commitAndTransferResponses(const UniqueLock & guard);
- std::unique_ptr<CommitChunk> grabCurrentChunk(const vespalib::MonitorGuard & guard);
- void commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard);
+ std::unique_ptr<CommitChunk> grabCurrentChunk(const UniqueLock & guard);
+ void commitChunk(std::unique_ptr<CommitChunk> chunk, const UniqueLock & chunkOrderGuard);
void doCommit(std::unique_ptr<CommitChunk> chunk);
SerialNum begin(const UniqueLock & guard) const;
SerialNum end(const UniqueLock & guard) const;
@@ -87,12 +88,14 @@ private:
std::unique_ptr<Executor> _singleCommitter;
Executor &_executor;
std::atomic<int> _sessionId;
- vespalib::Monitor _syncMonitor;
+ std::mutex _syncMonitor;
+ std::condition_variable _syncCond;
bool _pendingSync;
vespalib::string _name;
DomainPartList _parts;
mutable std::mutex _lock;
- vespalib::Monitor _currentChunkMonitor;
+ std::mutex _currentChunkMonitor;
+ std::condition_variable _currentChunkCond;
mutable std::mutex _sessionLock;
SessionList _sessions;
DurationSeconds _maxSessionRunTime;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp
index 3aedeb11121..edfd7367a1f 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp
@@ -8,7 +8,6 @@
#include <vespa/log/log.h>
LOG_SETUP(".transactionlog.session");
-using vespalib::LockGuard;
namespace search::transactionlog {