diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-03-02 08:21:25 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-03-02 08:21:25 +0000 |
commit | 8ccbd239b2e78644ebfdff8d425f42b9990b6412 (patch) | |
tree | 8db49351614eec5c512586fda975e8b771e048fe /searchlib | |
parent | 22e19a3d5dae693f36f62a2b35fab9ef0f99581c (diff) |
Since we schedule the last chunk for commit in triggerSyncNow, we can assert that we will be fully synced on the next pull when it happens in the singleCommitter thread.
That allows for further simplification.
Diffstat (limited to 'searchlib')
3 files changed, 51 insertions, 67 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index e649e48b812..c160434a465 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -44,14 +44,11 @@ Domain::Domain(const string &domainName, const string & baseDir, vespalib::Execu _singleCommitter(std::make_unique<vespalib::ThreadStackExecutor>(1, 128_Ki)), _executor(executor), _sessionId(1), - _syncMonitor(), - _pendingSync(false), - _done_sync_tasks(), _name(domainName), _parts(), - _lock(), - _currentChunkMonitor(), - _sessionLock(), + _partsMutex(), + _currentChunkMutex(), + _sessionMutex(), _sessions(), _maxSessionRunTime(), _baseDir(baseDir), @@ -103,7 +100,7 @@ Domain::addPart(SerialNum partId, bool isLastPart) { dp->erase(dp->range().to() + 1); } else { { - std::lock_guard guard(_lock); + std::lock_guard guard(_partsMutex); _parts[partId] = dp; } if (! isLastPart) { @@ -113,9 +110,10 @@ Domain::addPart(SerialNum partId, bool isLastPart) { } Domain::~Domain() { - std::unique_lock guard(_currentChunkMonitor); - _currentChunkCond.notify_all(); - commitChunk(grabCurrentChunk(guard), guard); + { + std::unique_lock guard(_currentChunkMutex); + commitChunk(grabCurrentChunk(guard), guard); + } vespalib::Gate gate; _singleCommitter->execute(makeLambdaTask([callback=std::make_unique<vespalib::GateCallback>(gate)]() { (void) callback;})); gate.await(); @@ -124,7 +122,7 @@ Domain::~Domain() { DomainInfo Domain::getDomainInfo() const { - std::unique_lock guard(_lock); + std::unique_lock guard(_partsMutex); DomainInfo info(SerialNumRange(begin(guard), end(guard)), size(guard), byteSize(guard), _maxSessionRunTime); for (const auto &entry: _parts) { const DomainPart &part = *entry.second; @@ -136,12 +134,12 @@ Domain::getDomainInfo() const SerialNum Domain::begin() const { - return begin(UniqueLock(_lock)); + return begin(UniqueLock(_partsMutex)); } void Domain::verifyLock(const UniqueLock & guard) const { - assert(guard.mutex() == &_lock); + assert(guard.mutex() == &_partsMutex); assert(guard.owns_lock()); } SerialNum @@ -158,7 +156,7 @@ Domain::begin(const UniqueLock & guard) const SerialNum Domain::end() const { - return end(UniqueLock(_lock)); + return end(UniqueLock(_partsMutex)); } SerialNum @@ -175,7 +173,7 @@ Domain::end(const UniqueLock & guard) const size_t Domain::byteSize() const { - return byteSize(UniqueLock(_lock)); + return byteSize(UniqueLock(_partsMutex)); } size_t @@ -194,7 +192,7 @@ SerialNum Domain::getSynced() const { SerialNum s(0); - UniqueLock guard(_lock); + UniqueLock guard(_partsMutex); if (_parts.empty()) { return s; } @@ -210,35 +208,22 @@ Domain::getSynced() const void -Domain::triggerSyncNow(std::unique_ptr<vespalib::Executor::Task> done_sync_task) +Domain::triggerSyncNow(std::unique_ptr<vespalib::IDestructorCallback> after_sync) { { - std::unique_lock guard(_currentChunkMonitor); + std::unique_lock guard(_currentChunkMutex); commitAndTransferResponses(guard); } - std::unique_lock guard(_syncMonitor); - if (done_sync_task) { - _done_sync_tasks.push_back(std::move(done_sync_task)); - } - if (!_pendingSync) { - _pendingSync = true; - _singleCommitter->execute(makeLambdaTask([this, domainPart=getActivePart()]() { - domainPart->sync(); - std::lock_guard monitorGuard(_syncMonitor); - _pendingSync = false; - for (auto &task : _done_sync_tasks) { - auto failed_task = _singleCommitter->execute(std::move(task)); - assert(!failed_task); - } - _done_sync_tasks.clear(); - })); - } + _singleCommitter->execute(makeLambdaTask([after_sync=std::move(after_sync), domainPart=getActivePart()]() { + (void) after_sync; + domainPart->sync(); + })); } DomainPart::SP Domain::findPart(SerialNum s) { - std::lock_guard guard(_lock); + std::lock_guard guard(_partsMutex); DomainPartList::iterator it(_parts.upper_bound(s)); if (!_parts.empty() && it != _parts.begin()) { DomainPartList::iterator prev(it); @@ -255,14 +240,14 @@ Domain::findPart(SerialNum s) DomainPart::SP Domain::getActivePart() { - std::lock_guard guard(_lock); + std::lock_guard guard(_partsMutex); return _parts.rbegin()->second; } uint64_t Domain::size() const { - return size(UniqueLock(_lock)); + return size(UniqueLock(_partsMutex)); } uint64_t @@ -280,7 +265,7 @@ SerialNum Domain::findOldestActiveVisit() const { SerialNum oldestActive(std::numeric_limits<SerialNum>::max()); - std::lock_guard guard(_sessionLock); + std::lock_guard guard(_sessionMutex); for (const auto & pair : _sessions) { Session * session(pair.second.get()); if (!session->inSync()) { @@ -296,7 +281,7 @@ Domain::cleanSessions() if ( _sessions.empty()) { return; } - std::lock_guard guard(_sessionLock); + std::lock_guard guard(_sessionMutex); for (SessionList::iterator it(_sessions.begin()), mt(_sessions.end()); it != mt; ) { Session * session(it->second.get()); if (session->inSync()) { @@ -316,7 +301,7 @@ Domain::optionallyRotateFile(SerialNum serialNum) { dp->close(); dp = std::make_shared<DomainPart>(_name, dir(), serialNum, _fileHeaderContext, false); { - std::lock_guard guard(_lock); + std::lock_guard guard(_partsMutex); _parts[serialNum] = dp; assert(_parts.rbegin()->first == serialNum); } @@ -327,7 +312,7 @@ Domain::optionallyRotateFile(SerialNum serialNum) { void Domain::append(const Packet & packet, Writer::DoneCallback onDone) { - std::unique_lock guard(_currentChunkMonitor); + std::unique_lock guard(_currentChunkMutex); if (_lastSerial >= packet.range().from()) { throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").", packet.range().from(), _lastSerial)); @@ -340,7 +325,7 @@ Domain::append(const Packet & packet, Writer::DoneCallback onDone) { Domain::CommitResult Domain::startCommit(DoneCallback onDone) { - std::unique_lock guard(_currentChunkMonitor); + std::unique_lock guard(_currentChunkMutex); if ( !_currentChunk->empty() ) { auto completed = grabCurrentChunk(guard); completed->setCommitDoneCallback(std::move(onDone)); @@ -367,7 +352,7 @@ Domain::commitAndTransferResponses(const UniqueLock &guard) { std::unique_ptr<CommitChunk> Domain::grabCurrentChunk(const UniqueLock & guard) { - assert(guard.mutex() == &_currentChunkMonitor && guard.owns_lock()); + assert(guard.mutex() == &_currentChunkMutex && guard.owns_lock()); auto chunk = std::move(_currentChunk); _currentChunk = createCommitChunk(_config); return chunk; @@ -375,7 +360,7 @@ Domain::grabCurrentChunk(const UniqueLock & guard) { void Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const UniqueLock & chunkOrderGuard) { - assert(chunkOrderGuard.mutex() == &_currentChunkMonitor && chunkOrderGuard.owns_lock()); + assert(chunkOrderGuard.mutex() == &_currentChunkMutex && chunkOrderGuard.owns_lock()); if (chunk->getPacket().empty()) return; chunk->shrinkPayloadToFit(); std::promise<SerializedChunk> promise; @@ -410,7 +395,7 @@ Domain::erase(SerialNum to) { bool retval(true); /// Do not erase the last element - UniqueLock guard(_lock); + UniqueLock guard(_partsMutex); for (DomainPartList::iterator it(_parts.begin()); (_parts.size() > 1) && (it->second.get()->range().to() < to); it = _parts.begin()) { DomainPart::SP dp(it->second); _parts.erase(it); @@ -433,7 +418,7 @@ Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to, std::uniq SerialNumRange range(from, to); auto session = std::make_shared<Session>(_sessionId++, range, domain, std::move(dest)); int id = session->id(); - std::lock_guard guard(_sessionLock); + std::lock_guard guard(_sessionMutex); _sessions[id] = std::move(session); return id; } @@ -442,7 +427,7 @@ int Domain::startSession(int sessionId) { int retval(-1); - std::lock_guard guard(_sessionLock); + std::lock_guard guard(_sessionMutex); SessionList::iterator found = _sessions.find(sessionId); if (found != _sessions.end()) { found->second->setStartTime(vespalib::steady_clock::now()); @@ -461,7 +446,7 @@ Domain::closeSession(int sessionId) int retval(-1); DurationSeconds sessionRunTime(0); { - std::lock_guard guard(_sessionLock); + std::lock_guard guard(_sessionMutex); SessionList::iterator found = _sessions.find(sessionId); if (found != _sessions.end()) { sessionRunTime = (vespalib::steady_clock::now() - found->second->getStartTime()); @@ -470,7 +455,7 @@ Domain::closeSession(int sessionId) } while (retval == 1) { std::this_thread::sleep_for(10ms); - std::lock_guard guard(_sessionLock); + std::lock_guard guard(_sessionMutex); SessionList::iterator found = _sessions.find(sessionId); if (found != _sessions.end()) { if ( ! found->second->isVisitRunning()) { @@ -482,7 +467,7 @@ Domain::closeSession(int sessionId) } } { - std::lock_guard guard(_lock); + std::lock_guard guard(_partsMutex); if (sessionRunTime > _maxSessionRunTime) { _maxSessionRunTime = sessionRunTime; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index 261a38d8ed4..d7b59d676dd 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -35,7 +35,7 @@ public: SerialNum begin() const; SerialNum end() const; SerialNum getSynced() const; - void triggerSyncNow(std::unique_ptr<vespalib::Executor::Task> done_sync_task); + void triggerSyncNow(std::unique_ptr<vespalib::IDestructorCallback> after_sync); bool getMarkedDeleted() const { return _markedDeleted; } void markDeleted() { _markedDeleted = true; } @@ -80,23 +80,19 @@ private: using SessionList = std::map<int, std::shared_ptr<Session>>; using DomainPartList = std::map<SerialNum, DomainPartSP>; using DurationSeconds = std::chrono::duration<double>; - using TaskUP = std::unique_ptr<vespalib::Executor::Task>; + using Executor = vespalib::Executor; DomainConfig _config; std::unique_ptr<CommitChunk> _currentChunk; SerialNum _lastSerial; - std::unique_ptr<vespalib::Executor> _singleCommitter; - vespalib::Executor &_executor; + std::unique_ptr<Executor> _singleCommitter; + Executor &_executor; std::atomic<int> _sessionId; - std::mutex _syncMonitor; - bool _pendingSync; - std::vector<TaskUP> _done_sync_tasks; vespalib::string _name; DomainPartList _parts; - mutable std::mutex _lock; - std::mutex _currentChunkMonitor; - std::condition_variable _currentChunkCond; - mutable std::mutex _sessionLock; + mutable std::mutex _partsMutex; + std::mutex _currentChunkMutex; + mutable std::mutex _sessionMutex; SessionList _sessions; DurationSeconds _maxSessionRunTime; vespalib::string _baseDir; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index a0853dcbd86..b5806e1f962 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -9,7 +9,6 @@ #include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/util/exceptions.h> -#include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/stringfmt.h> #include <fstream> @@ -42,7 +41,7 @@ public: SyncHandler(std::atomic<bool>& closed, FRT_RPCRequest *req, Domain::SP domain, SerialNum syncTo) noexcept; ~SyncHandler(); - void poll(); + bool poll(); }; SyncHandler::SyncHandler(std::atomic<bool>& closed, FRT_RPCRequest *req, Domain::SP domain, SerialNum syncTo) noexcept @@ -55,7 +54,7 @@ SyncHandler::SyncHandler(std::atomic<bool>& closed, FRT_RPCRequest *req, Domain: SyncHandler::~SyncHandler() = default; -void +bool SyncHandler::poll() { SerialNum synced(_domain->getSynced()); @@ -67,9 +66,13 @@ SyncHandler::poll() rvals.AddInt32(0); rvals.AddInt64(synced); _req.Return(); - } else { - _domain->triggerSyncNow(vespalib::makeLambdaTask([self = shared_from_this()]() { self->poll(); })); + return true; } + _domain->triggerSyncNow(vespalib::makeUniqueLambdaCallback([self = shared_from_this()]() { + bool completed = self->poll(); + assert(completed); + })); + return false; } VESPA_THREAD_STACK_TAG(tls_executor); |