diff options
6 files changed, 64 insertions, 73 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp index 3eda5146af1..060215c4521 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp @@ -6,7 +6,7 @@ #include <vespa/vespalib/util/destructor_callbacks.h> using vespalib::makeLambdaTask; -using vespalib::makeLambdaCallback; +using vespalib::makeSharedLambdaCallback; namespace storage::spi::dummy { @@ -32,7 +32,7 @@ DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask> } _inFlight.insert(bucket.getBucket()); } - bucketTask->run(bucket, makeLambdaCallback([this, bucket]() { + bucketTask->run(bucket, makeSharedLambdaCallback([this, bucket]() { std::unique_lock guard(_lock); assert(_inFlight.contains(bucket.getBucket())); _inFlight.erase(bucket.getBucket()); diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp index d5297fe8f8c..1153a09d09f 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp @@ -37,7 +37,7 @@ using search::queryeval::ISourceSelector; using search::queryeval::Source; using search::SerialNum; using vespalib::makeLambdaTask; -using vespalib::makeLambdaCallback; +using vespalib::makeSharedLambdaCallback; using std::ostringstream; using vespalib::string; using vespalib::Executor; @@ -313,7 +313,7 @@ IndexMaintainer::loadDiskIndex(const string &indexDir) _disk_indexes->setActive(indexDir, stats.sizeOnDisk()); auto retval = std::make_shared<DiskIndexWithDestructorCallback>( std::move(index), - makeLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); }), + makeSharedLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); }), _layout, *_disk_indexes); if (LOG_WOULD_LOG(event)) { EventLogger::diskIndexLoadComplete(indexDir, vespalib::count_ms(timer.elapsed())); @@ -336,7 +336,7 @@ IndexMaintainer::reloadDiskIndex(const IDiskIndex &oldIndex) _disk_indexes->setActive(indexDir, stats.sizeOnDisk()); auto retval = std::make_shared<DiskIndexWithDestructorCallback>( std::move(index), - makeLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); }), + makeSharedLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); }), _layout, *_disk_indexes); if (LOG_WOULD_LOG(event)) { EventLogger::diskIndexLoadComplete(indexDir, vespalib::count_ms(timer.elapsed())); diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index e649e48b812..c160434a465 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -44,14 +44,11 @@ Domain::Domain(const string &domainName, const string & baseDir, vespalib::Execu _singleCommitter(std::make_unique<vespalib::ThreadStackExecutor>(1, 128_Ki)), _executor(executor), _sessionId(1), - _syncMonitor(), - _pendingSync(false), - _done_sync_tasks(), _name(domainName), _parts(), - _lock(), - _currentChunkMonitor(), - _sessionLock(), + _partsMutex(), + _currentChunkMutex(), + _sessionMutex(), _sessions(), _maxSessionRunTime(), _baseDir(baseDir), @@ -103,7 +100,7 @@ Domain::addPart(SerialNum partId, bool isLastPart) { dp->erase(dp->range().to() + 1); } else { { - std::lock_guard guard(_lock); + std::lock_guard guard(_partsMutex); _parts[partId] = dp; } if (! isLastPart) { @@ -113,9 +110,10 @@ Domain::addPart(SerialNum partId, bool isLastPart) { } Domain::~Domain() { - std::unique_lock guard(_currentChunkMonitor); - _currentChunkCond.notify_all(); - commitChunk(grabCurrentChunk(guard), guard); + { + std::unique_lock guard(_currentChunkMutex); + commitChunk(grabCurrentChunk(guard), guard); + } vespalib::Gate gate; _singleCommitter->execute(makeLambdaTask([callback=std::make_unique<vespalib::GateCallback>(gate)]() { (void) callback;})); gate.await(); @@ -124,7 +122,7 @@ Domain::~Domain() { DomainInfo Domain::getDomainInfo() const { - std::unique_lock guard(_lock); + std::unique_lock guard(_partsMutex); DomainInfo info(SerialNumRange(begin(guard), end(guard)), size(guard), byteSize(guard), _maxSessionRunTime); for (const auto &entry: _parts) { const DomainPart &part = *entry.second; @@ -136,12 +134,12 @@ Domain::getDomainInfo() const SerialNum Domain::begin() const { - return begin(UniqueLock(_lock)); + return begin(UniqueLock(_partsMutex)); } void Domain::verifyLock(const UniqueLock & guard) const { - assert(guard.mutex() == &_lock); + assert(guard.mutex() == &_partsMutex); assert(guard.owns_lock()); } SerialNum @@ -158,7 +156,7 @@ Domain::begin(const UniqueLock & guard) const SerialNum Domain::end() const { - return end(UniqueLock(_lock)); + return end(UniqueLock(_partsMutex)); } SerialNum @@ -175,7 +173,7 @@ Domain::end(const UniqueLock & guard) const size_t Domain::byteSize() const { - return byteSize(UniqueLock(_lock)); + return byteSize(UniqueLock(_partsMutex)); } size_t @@ -194,7 +192,7 @@ SerialNum Domain::getSynced() const { SerialNum s(0); - UniqueLock guard(_lock); + UniqueLock guard(_partsMutex); if (_parts.empty()) { return s; } @@ -210,35 +208,22 @@ Domain::getSynced() const void -Domain::triggerSyncNow(std::unique_ptr<vespalib::Executor::Task> done_sync_task) +Domain::triggerSyncNow(std::unique_ptr<vespalib::IDestructorCallback> after_sync) { { - std::unique_lock guard(_currentChunkMonitor); + std::unique_lock guard(_currentChunkMutex); commitAndTransferResponses(guard); } - std::unique_lock guard(_syncMonitor); - if (done_sync_task) { - _done_sync_tasks.push_back(std::move(done_sync_task)); - } - if (!_pendingSync) { - _pendingSync = true; - _singleCommitter->execute(makeLambdaTask([this, domainPart=getActivePart()]() { - domainPart->sync(); - std::lock_guard monitorGuard(_syncMonitor); - _pendingSync = false; - for (auto &task : _done_sync_tasks) { - auto failed_task = _singleCommitter->execute(std::move(task)); - assert(!failed_task); - } - _done_sync_tasks.clear(); - })); - } + _singleCommitter->execute(makeLambdaTask([after_sync=std::move(after_sync), domainPart=getActivePart()]() { + (void) after_sync; + domainPart->sync(); + })); } DomainPart::SP Domain::findPart(SerialNum s) { - std::lock_guard guard(_lock); + std::lock_guard guard(_partsMutex); DomainPartList::iterator it(_parts.upper_bound(s)); if (!_parts.empty() && it != _parts.begin()) { DomainPartList::iterator prev(it); @@ -255,14 +240,14 @@ Domain::findPart(SerialNum s) DomainPart::SP Domain::getActivePart() { - std::lock_guard guard(_lock); + std::lock_guard guard(_partsMutex); return _parts.rbegin()->second; } uint64_t Domain::size() const { - return size(UniqueLock(_lock)); + return size(UniqueLock(_partsMutex)); } uint64_t @@ -280,7 +265,7 @@ SerialNum Domain::findOldestActiveVisit() const { SerialNum oldestActive(std::numeric_limits<SerialNum>::max()); - std::lock_guard guard(_sessionLock); + std::lock_guard guard(_sessionMutex); for (const auto & pair : _sessions) { Session * session(pair.second.get()); if (!session->inSync()) { @@ -296,7 +281,7 @@ Domain::cleanSessions() if ( _sessions.empty()) { return; } - std::lock_guard guard(_sessionLock); + std::lock_guard guard(_sessionMutex); for (SessionList::iterator it(_sessions.begin()), mt(_sessions.end()); it != mt; ) { Session * session(it->second.get()); if (session->inSync()) { @@ -316,7 +301,7 @@ Domain::optionallyRotateFile(SerialNum serialNum) { dp->close(); dp = std::make_shared<DomainPart>(_name, dir(), serialNum, _fileHeaderContext, false); { - std::lock_guard guard(_lock); + std::lock_guard guard(_partsMutex); _parts[serialNum] = dp; assert(_parts.rbegin()->first == serialNum); } @@ -327,7 +312,7 @@ Domain::optionallyRotateFile(SerialNum serialNum) { void Domain::append(const Packet & packet, Writer::DoneCallback onDone) { - std::unique_lock guard(_currentChunkMonitor); + std::unique_lock guard(_currentChunkMutex); if (_lastSerial >= packet.range().from()) { throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").", packet.range().from(), _lastSerial)); @@ -340,7 +325,7 @@ Domain::append(const Packet & packet, Writer::DoneCallback onDone) { Domain::CommitResult Domain::startCommit(DoneCallback onDone) { - std::unique_lock guard(_currentChunkMonitor); + std::unique_lock guard(_currentChunkMutex); if ( !_currentChunk->empty() ) { auto completed = grabCurrentChunk(guard); completed->setCommitDoneCallback(std::move(onDone)); @@ -367,7 +352,7 @@ Domain::commitAndTransferResponses(const UniqueLock &guard) { std::unique_ptr<CommitChunk> Domain::grabCurrentChunk(const UniqueLock & guard) { - assert(guard.mutex() == &_currentChunkMonitor && guard.owns_lock()); + assert(guard.mutex() == &_currentChunkMutex && guard.owns_lock()); auto chunk = std::move(_currentChunk); _currentChunk = createCommitChunk(_config); return chunk; @@ -375,7 +360,7 @@ Domain::grabCurrentChunk(const UniqueLock & guard) { void Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const UniqueLock & chunkOrderGuard) { - assert(chunkOrderGuard.mutex() == &_currentChunkMonitor && chunkOrderGuard.owns_lock()); + assert(chunkOrderGuard.mutex() == &_currentChunkMutex && chunkOrderGuard.owns_lock()); if (chunk->getPacket().empty()) return; chunk->shrinkPayloadToFit(); std::promise<SerializedChunk> promise; @@ -410,7 +395,7 @@ Domain::erase(SerialNum to) { bool retval(true); /// Do not erase the last element - UniqueLock guard(_lock); + UniqueLock guard(_partsMutex); for (DomainPartList::iterator it(_parts.begin()); (_parts.size() > 1) && (it->second.get()->range().to() < to); it = _parts.begin()) { DomainPart::SP dp(it->second); _parts.erase(it); @@ -433,7 +418,7 @@ Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to, std::uniq SerialNumRange range(from, to); auto session = std::make_shared<Session>(_sessionId++, range, domain, std::move(dest)); int id = session->id(); - std::lock_guard guard(_sessionLock); + std::lock_guard guard(_sessionMutex); _sessions[id] = std::move(session); return id; } @@ -442,7 +427,7 @@ int Domain::startSession(int sessionId) { int retval(-1); - std::lock_guard guard(_sessionLock); + std::lock_guard guard(_sessionMutex); SessionList::iterator found = _sessions.find(sessionId); if (found != _sessions.end()) { found->second->setStartTime(vespalib::steady_clock::now()); @@ -461,7 +446,7 @@ Domain::closeSession(int sessionId) int retval(-1); DurationSeconds sessionRunTime(0); { - std::lock_guard guard(_sessionLock); + std::lock_guard guard(_sessionMutex); SessionList::iterator found = _sessions.find(sessionId); if (found != _sessions.end()) { sessionRunTime = (vespalib::steady_clock::now() - found->second->getStartTime()); @@ -470,7 +455,7 @@ Domain::closeSession(int sessionId) } while (retval == 1) { std::this_thread::sleep_for(10ms); - std::lock_guard guard(_sessionLock); + std::lock_guard guard(_sessionMutex); SessionList::iterator found = _sessions.find(sessionId); if (found != _sessions.end()) { if ( ! found->second->isVisitRunning()) { @@ -482,7 +467,7 @@ Domain::closeSession(int sessionId) } } { - std::lock_guard guard(_lock); + std::lock_guard guard(_partsMutex); if (sessionRunTime > _maxSessionRunTime) { _maxSessionRunTime = sessionRunTime; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index 261a38d8ed4..d7b59d676dd 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -35,7 +35,7 @@ public: SerialNum begin() const; SerialNum end() const; SerialNum getSynced() const; - void triggerSyncNow(std::unique_ptr<vespalib::Executor::Task> done_sync_task); + void triggerSyncNow(std::unique_ptr<vespalib::IDestructorCallback> after_sync); bool getMarkedDeleted() const { return _markedDeleted; } void markDeleted() { _markedDeleted = true; } @@ -80,23 +80,19 @@ private: using SessionList = std::map<int, std::shared_ptr<Session>>; using DomainPartList = std::map<SerialNum, DomainPartSP>; using DurationSeconds = std::chrono::duration<double>; - using TaskUP = std::unique_ptr<vespalib::Executor::Task>; + using Executor = vespalib::Executor; DomainConfig _config; std::unique_ptr<CommitChunk> _currentChunk; SerialNum _lastSerial; - std::unique_ptr<vespalib::Executor> _singleCommitter; - vespalib::Executor &_executor; + std::unique_ptr<Executor> _singleCommitter; + Executor &_executor; std::atomic<int> _sessionId; - std::mutex _syncMonitor; - bool _pendingSync; - std::vector<TaskUP> _done_sync_tasks; vespalib::string _name; DomainPartList _parts; - mutable std::mutex _lock; - std::mutex _currentChunkMonitor; - std::condition_variable _currentChunkCond; - mutable std::mutex _sessionLock; + mutable std::mutex _partsMutex; + std::mutex _currentChunkMutex; + mutable std::mutex _sessionMutex; SessionList _sessions; DurationSeconds _maxSessionRunTime; vespalib::string _baseDir; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index a0853dcbd86..b5806e1f962 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -9,7 +9,6 @@ #include <vespa/vespalib/util/cpu_usage.h> #include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/util/exceptions.h> -#include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/stringfmt.h> #include <fstream> @@ -42,7 +41,7 @@ public: SyncHandler(std::atomic<bool>& closed, FRT_RPCRequest *req, Domain::SP domain, SerialNum syncTo) noexcept; ~SyncHandler(); - void poll(); + bool poll(); }; SyncHandler::SyncHandler(std::atomic<bool>& closed, FRT_RPCRequest *req, Domain::SP domain, SerialNum syncTo) noexcept @@ -55,7 +54,7 @@ SyncHandler::SyncHandler(std::atomic<bool>& closed, FRT_RPCRequest *req, Domain: SyncHandler::~SyncHandler() = default; -void +bool SyncHandler::poll() { SerialNum synced(_domain->getSynced()); @@ -67,9 +66,13 @@ SyncHandler::poll() rvals.AddInt32(0); rvals.AddInt64(synced); _req.Return(); - } else { - _domain->triggerSyncNow(vespalib::makeLambdaTask([self = shared_from_this()]() { self->poll(); })); + return true; } + _domain->triggerSyncNow(vespalib::makeUniqueLambdaCallback([self = shared_from_this()]() { + bool completed = self->poll(); + assert(completed); + })); + return false; } VESPA_THREAD_STACK_TAG(tls_executor); diff --git a/vespalib/src/vespa/vespalib/util/destructor_callbacks.h b/vespalib/src/vespa/vespalib/util/destructor_callbacks.h index 7fc15e0185e..7452d72ea44 100644 --- a/vespalib/src/vespa/vespalib/util/destructor_callbacks.h +++ b/vespalib/src/vespa/vespalib/util/destructor_callbacks.h @@ -44,10 +44,17 @@ private: template<class FunctionType> std::shared_ptr<IDestructorCallback> -makeLambdaCallback(FunctionType &&function) { +makeSharedLambdaCallback(FunctionType &&function) { return std::make_shared<LambdaCallback<std::decay_t<FunctionType>>> (std::forward<FunctionType>(function)); } +template<class FunctionType> +std::unique_ptr<IDestructorCallback> +makeUniqueLambdaCallback(FunctionType &&function) { + return std::make_unique<LambdaCallback<std::decay_t<FunctionType>>> + (std::forward<FunctionType>(function)); +} + } |