summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp4
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp6
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp87
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h18
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp13
-rw-r--r--vespalib/src/vespa/vespalib/util/destructor_callbacks.h9
6 files changed, 64 insertions, 73 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
index 3eda5146af1..060215c4521 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
@@ -6,7 +6,7 @@
#include <vespa/vespalib/util/destructor_callbacks.h>
using vespalib::makeLambdaTask;
-using vespalib::makeLambdaCallback;
+using vespalib::makeSharedLambdaCallback;
namespace storage::spi::dummy {
@@ -32,7 +32,7 @@ DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask>
}
_inFlight.insert(bucket.getBucket());
}
- bucketTask->run(bucket, makeLambdaCallback([this, bucket]() {
+ bucketTask->run(bucket, makeSharedLambdaCallback([this, bucket]() {
std::unique_lock guard(_lock);
assert(_inFlight.contains(bucket.getBucket()));
_inFlight.erase(bucket.getBucket());
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
index d5297fe8f8c..1153a09d09f 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
@@ -37,7 +37,7 @@ using search::queryeval::ISourceSelector;
using search::queryeval::Source;
using search::SerialNum;
using vespalib::makeLambdaTask;
-using vespalib::makeLambdaCallback;
+using vespalib::makeSharedLambdaCallback;
using std::ostringstream;
using vespalib::string;
using vespalib::Executor;
@@ -313,7 +313,7 @@ IndexMaintainer::loadDiskIndex(const string &indexDir)
_disk_indexes->setActive(indexDir, stats.sizeOnDisk());
auto retval = std::make_shared<DiskIndexWithDestructorCallback>(
std::move(index),
- makeLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); }),
+ makeSharedLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); }),
_layout, *_disk_indexes);
if (LOG_WOULD_LOG(event)) {
EventLogger::diskIndexLoadComplete(indexDir, vespalib::count_ms(timer.elapsed()));
@@ -336,7 +336,7 @@ IndexMaintainer::reloadDiskIndex(const IDiskIndex &oldIndex)
_disk_indexes->setActive(indexDir, stats.sizeOnDisk());
auto retval = std::make_shared<DiskIndexWithDestructorCallback>(
std::move(index),
- makeLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); }),
+ makeSharedLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); }),
_layout, *_disk_indexes);
if (LOG_WOULD_LOG(event)) {
EventLogger::diskIndexLoadComplete(indexDir, vespalib::count_ms(timer.elapsed()));
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index e649e48b812..c160434a465 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -44,14 +44,11 @@ Domain::Domain(const string &domainName, const string & baseDir, vespalib::Execu
_singleCommitter(std::make_unique<vespalib::ThreadStackExecutor>(1, 128_Ki)),
_executor(executor),
_sessionId(1),
- _syncMonitor(),
- _pendingSync(false),
- _done_sync_tasks(),
_name(domainName),
_parts(),
- _lock(),
- _currentChunkMonitor(),
- _sessionLock(),
+ _partsMutex(),
+ _currentChunkMutex(),
+ _sessionMutex(),
_sessions(),
_maxSessionRunTime(),
_baseDir(baseDir),
@@ -103,7 +100,7 @@ Domain::addPart(SerialNum partId, bool isLastPart) {
dp->erase(dp->range().to() + 1);
} else {
{
- std::lock_guard guard(_lock);
+ std::lock_guard guard(_partsMutex);
_parts[partId] = dp;
}
if (! isLastPart) {
@@ -113,9 +110,10 @@ Domain::addPart(SerialNum partId, bool isLastPart) {
}
Domain::~Domain() {
- std::unique_lock guard(_currentChunkMonitor);
- _currentChunkCond.notify_all();
- commitChunk(grabCurrentChunk(guard), guard);
+ {
+ std::unique_lock guard(_currentChunkMutex);
+ commitChunk(grabCurrentChunk(guard), guard);
+ }
vespalib::Gate gate;
_singleCommitter->execute(makeLambdaTask([callback=std::make_unique<vespalib::GateCallback>(gate)]() { (void) callback;}));
gate.await();
@@ -124,7 +122,7 @@ Domain::~Domain() {
DomainInfo
Domain::getDomainInfo() const
{
- std::unique_lock guard(_lock);
+ std::unique_lock guard(_partsMutex);
DomainInfo info(SerialNumRange(begin(guard), end(guard)), size(guard), byteSize(guard), _maxSessionRunTime);
for (const auto &entry: _parts) {
const DomainPart &part = *entry.second;
@@ -136,12 +134,12 @@ Domain::getDomainInfo() const
SerialNum
Domain::begin() const
{
- return begin(UniqueLock(_lock));
+ return begin(UniqueLock(_partsMutex));
}
void
Domain::verifyLock(const UniqueLock & guard) const {
- assert(guard.mutex() == &_lock);
+ assert(guard.mutex() == &_partsMutex);
assert(guard.owns_lock());
}
SerialNum
@@ -158,7 +156,7 @@ Domain::begin(const UniqueLock & guard) const
SerialNum
Domain::end() const
{
- return end(UniqueLock(_lock));
+ return end(UniqueLock(_partsMutex));
}
SerialNum
@@ -175,7 +173,7 @@ Domain::end(const UniqueLock & guard) const
size_t
Domain::byteSize() const
{
- return byteSize(UniqueLock(_lock));
+ return byteSize(UniqueLock(_partsMutex));
}
size_t
@@ -194,7 +192,7 @@ SerialNum
Domain::getSynced() const
{
SerialNum s(0);
- UniqueLock guard(_lock);
+ UniqueLock guard(_partsMutex);
if (_parts.empty()) {
return s;
}
@@ -210,35 +208,22 @@ Domain::getSynced() const
void
-Domain::triggerSyncNow(std::unique_ptr<vespalib::Executor::Task> done_sync_task)
+Domain::triggerSyncNow(std::unique_ptr<vespalib::IDestructorCallback> after_sync)
{
{
- std::unique_lock guard(_currentChunkMonitor);
+ std::unique_lock guard(_currentChunkMutex);
commitAndTransferResponses(guard);
}
- std::unique_lock guard(_syncMonitor);
- if (done_sync_task) {
- _done_sync_tasks.push_back(std::move(done_sync_task));
- }
- if (!_pendingSync) {
- _pendingSync = true;
- _singleCommitter->execute(makeLambdaTask([this, domainPart=getActivePart()]() {
- domainPart->sync();
- std::lock_guard monitorGuard(_syncMonitor);
- _pendingSync = false;
- for (auto &task : _done_sync_tasks) {
- auto failed_task = _singleCommitter->execute(std::move(task));
- assert(!failed_task);
- }
- _done_sync_tasks.clear();
- }));
- }
+ _singleCommitter->execute(makeLambdaTask([after_sync=std::move(after_sync), domainPart=getActivePart()]() {
+ (void) after_sync;
+ domainPart->sync();
+ }));
}
DomainPart::SP
Domain::findPart(SerialNum s)
{
- std::lock_guard guard(_lock);
+ std::lock_guard guard(_partsMutex);
DomainPartList::iterator it(_parts.upper_bound(s));
if (!_parts.empty() && it != _parts.begin()) {
DomainPartList::iterator prev(it);
@@ -255,14 +240,14 @@ Domain::findPart(SerialNum s)
DomainPart::SP
Domain::getActivePart() {
- std::lock_guard guard(_lock);
+ std::lock_guard guard(_partsMutex);
return _parts.rbegin()->second;
}
uint64_t
Domain::size() const
{
- return size(UniqueLock(_lock));
+ return size(UniqueLock(_partsMutex));
}
uint64_t
@@ -280,7 +265,7 @@ SerialNum
Domain::findOldestActiveVisit() const
{
SerialNum oldestActive(std::numeric_limits<SerialNum>::max());
- std::lock_guard guard(_sessionLock);
+ std::lock_guard guard(_sessionMutex);
for (const auto & pair : _sessions) {
Session * session(pair.second.get());
if (!session->inSync()) {
@@ -296,7 +281,7 @@ Domain::cleanSessions()
if ( _sessions.empty()) {
return;
}
- std::lock_guard guard(_sessionLock);
+ std::lock_guard guard(_sessionMutex);
for (SessionList::iterator it(_sessions.begin()), mt(_sessions.end()); it != mt; ) {
Session * session(it->second.get());
if (session->inSync()) {
@@ -316,7 +301,7 @@ Domain::optionallyRotateFile(SerialNum serialNum) {
dp->close();
dp = std::make_shared<DomainPart>(_name, dir(), serialNum, _fileHeaderContext, false);
{
- std::lock_guard guard(_lock);
+ std::lock_guard guard(_partsMutex);
_parts[serialNum] = dp;
assert(_parts.rbegin()->first == serialNum);
}
@@ -327,7 +312,7 @@ Domain::optionallyRotateFile(SerialNum serialNum) {
void
Domain::append(const Packet & packet, Writer::DoneCallback onDone) {
- std::unique_lock guard(_currentChunkMonitor);
+ std::unique_lock guard(_currentChunkMutex);
if (_lastSerial >= packet.range().from()) {
throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").",
packet.range().from(), _lastSerial));
@@ -340,7 +325,7 @@ Domain::append(const Packet & packet, Writer::DoneCallback onDone) {
Domain::CommitResult
Domain::startCommit(DoneCallback onDone) {
- std::unique_lock guard(_currentChunkMonitor);
+ std::unique_lock guard(_currentChunkMutex);
if ( !_currentChunk->empty() ) {
auto completed = grabCurrentChunk(guard);
completed->setCommitDoneCallback(std::move(onDone));
@@ -367,7 +352,7 @@ Domain::commitAndTransferResponses(const UniqueLock &guard) {
std::unique_ptr<CommitChunk>
Domain::grabCurrentChunk(const UniqueLock & guard) {
- assert(guard.mutex() == &_currentChunkMonitor && guard.owns_lock());
+ assert(guard.mutex() == &_currentChunkMutex && guard.owns_lock());
auto chunk = std::move(_currentChunk);
_currentChunk = createCommitChunk(_config);
return chunk;
@@ -375,7 +360,7 @@ Domain::grabCurrentChunk(const UniqueLock & guard) {
void
Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const UniqueLock & chunkOrderGuard) {
- assert(chunkOrderGuard.mutex() == &_currentChunkMonitor && chunkOrderGuard.owns_lock());
+ assert(chunkOrderGuard.mutex() == &_currentChunkMutex && chunkOrderGuard.owns_lock());
if (chunk->getPacket().empty()) return;
chunk->shrinkPayloadToFit();
std::promise<SerializedChunk> promise;
@@ -410,7 +395,7 @@ Domain::erase(SerialNum to)
{
bool retval(true);
/// Do not erase the last element
- UniqueLock guard(_lock);
+ UniqueLock guard(_partsMutex);
for (DomainPartList::iterator it(_parts.begin()); (_parts.size() > 1) && (it->second.get()->range().to() < to); it = _parts.begin()) {
DomainPart::SP dp(it->second);
_parts.erase(it);
@@ -433,7 +418,7 @@ Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to, std::uniq
SerialNumRange range(from, to);
auto session = std::make_shared<Session>(_sessionId++, range, domain, std::move(dest));
int id = session->id();
- std::lock_guard guard(_sessionLock);
+ std::lock_guard guard(_sessionMutex);
_sessions[id] = std::move(session);
return id;
}
@@ -442,7 +427,7 @@ int
Domain::startSession(int sessionId)
{
int retval(-1);
- std::lock_guard guard(_sessionLock);
+ std::lock_guard guard(_sessionMutex);
SessionList::iterator found = _sessions.find(sessionId);
if (found != _sessions.end()) {
found->second->setStartTime(vespalib::steady_clock::now());
@@ -461,7 +446,7 @@ Domain::closeSession(int sessionId)
int retval(-1);
DurationSeconds sessionRunTime(0);
{
- std::lock_guard guard(_sessionLock);
+ std::lock_guard guard(_sessionMutex);
SessionList::iterator found = _sessions.find(sessionId);
if (found != _sessions.end()) {
sessionRunTime = (vespalib::steady_clock::now() - found->second->getStartTime());
@@ -470,7 +455,7 @@ Domain::closeSession(int sessionId)
}
while (retval == 1) {
std::this_thread::sleep_for(10ms);
- std::lock_guard guard(_sessionLock);
+ std::lock_guard guard(_sessionMutex);
SessionList::iterator found = _sessions.find(sessionId);
if (found != _sessions.end()) {
if ( ! found->second->isVisitRunning()) {
@@ -482,7 +467,7 @@ Domain::closeSession(int sessionId)
}
}
{
- std::lock_guard guard(_lock);
+ std::lock_guard guard(_partsMutex);
if (sessionRunTime > _maxSessionRunTime) {
_maxSessionRunTime = sessionRunTime;
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index 261a38d8ed4..d7b59d676dd 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -35,7 +35,7 @@ public:
SerialNum begin() const;
SerialNum end() const;
SerialNum getSynced() const;
- void triggerSyncNow(std::unique_ptr<vespalib::Executor::Task> done_sync_task);
+ void triggerSyncNow(std::unique_ptr<vespalib::IDestructorCallback> after_sync);
bool getMarkedDeleted() const { return _markedDeleted; }
void markDeleted() { _markedDeleted = true; }
@@ -80,23 +80,19 @@ private:
using SessionList = std::map<int, std::shared_ptr<Session>>;
using DomainPartList = std::map<SerialNum, DomainPartSP>;
using DurationSeconds = std::chrono::duration<double>;
- using TaskUP = std::unique_ptr<vespalib::Executor::Task>;
+ using Executor = vespalib::Executor;
DomainConfig _config;
std::unique_ptr<CommitChunk> _currentChunk;
SerialNum _lastSerial;
- std::unique_ptr<vespalib::Executor> _singleCommitter;
- vespalib::Executor &_executor;
+ std::unique_ptr<Executor> _singleCommitter;
+ Executor &_executor;
std::atomic<int> _sessionId;
- std::mutex _syncMonitor;
- bool _pendingSync;
- std::vector<TaskUP> _done_sync_tasks;
vespalib::string _name;
DomainPartList _parts;
- mutable std::mutex _lock;
- std::mutex _currentChunkMonitor;
- std::condition_variable _currentChunkCond;
- mutable std::mutex _sessionLock;
+ mutable std::mutex _partsMutex;
+ std::mutex _currentChunkMutex;
+ mutable std::mutex _sessionMutex;
SessionList _sessions;
DurationSeconds _maxSessionRunTime;
vespalib::string _baseDir;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index a0853dcbd86..b5806e1f962 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -9,7 +9,6 @@
#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/destructor_callbacks.h>
#include <vespa/vespalib/util/exceptions.h>
-#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <fstream>
@@ -42,7 +41,7 @@ public:
SyncHandler(std::atomic<bool>& closed, FRT_RPCRequest *req, Domain::SP domain, SerialNum syncTo) noexcept;
~SyncHandler();
- void poll();
+ bool poll();
};
SyncHandler::SyncHandler(std::atomic<bool>& closed, FRT_RPCRequest *req, Domain::SP domain, SerialNum syncTo) noexcept
@@ -55,7 +54,7 @@ SyncHandler::SyncHandler(std::atomic<bool>& closed, FRT_RPCRequest *req, Domain:
SyncHandler::~SyncHandler() = default;
-void
+bool
SyncHandler::poll()
{
SerialNum synced(_domain->getSynced());
@@ -67,9 +66,13 @@ SyncHandler::poll()
rvals.AddInt32(0);
rvals.AddInt64(synced);
_req.Return();
- } else {
- _domain->triggerSyncNow(vespalib::makeLambdaTask([self = shared_from_this()]() { self->poll(); }));
+ return true;
}
+ _domain->triggerSyncNow(vespalib::makeUniqueLambdaCallback([self = shared_from_this()]() {
+ bool completed = self->poll();
+ assert(completed);
+ }));
+ return false;
}
VESPA_THREAD_STACK_TAG(tls_executor);
diff --git a/vespalib/src/vespa/vespalib/util/destructor_callbacks.h b/vespalib/src/vespa/vespalib/util/destructor_callbacks.h
index 7fc15e0185e..7452d72ea44 100644
--- a/vespalib/src/vespa/vespalib/util/destructor_callbacks.h
+++ b/vespalib/src/vespa/vespalib/util/destructor_callbacks.h
@@ -44,10 +44,17 @@ private:
template<class FunctionType>
std::shared_ptr<IDestructorCallback>
-makeLambdaCallback(FunctionType &&function) {
+makeSharedLambdaCallback(FunctionType &&function) {
return std::make_shared<LambdaCallback<std::decay_t<FunctionType>>>
(std::forward<FunctionType>(function));
}
+template<class FunctionType>
+std::unique_ptr<IDestructorCallback>
+makeUniqueLambdaCallback(FunctionType &&function) {
+ return std::make_unique<LambdaCallback<std::decay_t<FunctionType>>>
+ (std::forward<FunctionType>(function));
+}
+
}