summaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-03-02 08:21:25 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2022-03-02 08:21:25 +0000
commit8ccbd239b2e78644ebfdff8d425f42b9990b6412 (patch)
tree8db49351614eec5c512586fda975e8b771e048fe /searchlib
parent22e19a3d5dae693f36f62a2b35fab9ef0f99581c (diff)
Since we schedule the last chunk for commit in triggerSyncNow, we can assert that we will be fully synced on the next pull when it happens in the singleCommitter thread.
That allows for further simplification.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp87
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h18
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp13
3 files changed, 51 insertions, 67 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index e649e48b812..c160434a465 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -44,14 +44,11 @@ Domain::Domain(const string &domainName, const string & baseDir, vespalib::Execu
_singleCommitter(std::make_unique<vespalib::ThreadStackExecutor>(1, 128_Ki)),
_executor(executor),
_sessionId(1),
- _syncMonitor(),
- _pendingSync(false),
- _done_sync_tasks(),
_name(domainName),
_parts(),
- _lock(),
- _currentChunkMonitor(),
- _sessionLock(),
+ _partsMutex(),
+ _currentChunkMutex(),
+ _sessionMutex(),
_sessions(),
_maxSessionRunTime(),
_baseDir(baseDir),
@@ -103,7 +100,7 @@ Domain::addPart(SerialNum partId, bool isLastPart) {
dp->erase(dp->range().to() + 1);
} else {
{
- std::lock_guard guard(_lock);
+ std::lock_guard guard(_partsMutex);
_parts[partId] = dp;
}
if (! isLastPart) {
@@ -113,9 +110,10 @@ Domain::addPart(SerialNum partId, bool isLastPart) {
}
Domain::~Domain() {
- std::unique_lock guard(_currentChunkMonitor);
- _currentChunkCond.notify_all();
- commitChunk(grabCurrentChunk(guard), guard);
+ {
+ std::unique_lock guard(_currentChunkMutex);
+ commitChunk(grabCurrentChunk(guard), guard);
+ }
vespalib::Gate gate;
_singleCommitter->execute(makeLambdaTask([callback=std::make_unique<vespalib::GateCallback>(gate)]() { (void) callback;}));
gate.await();
@@ -124,7 +122,7 @@ Domain::~Domain() {
DomainInfo
Domain::getDomainInfo() const
{
- std::unique_lock guard(_lock);
+ std::unique_lock guard(_partsMutex);
DomainInfo info(SerialNumRange(begin(guard), end(guard)), size(guard), byteSize(guard), _maxSessionRunTime);
for (const auto &entry: _parts) {
const DomainPart &part = *entry.second;
@@ -136,12 +134,12 @@ Domain::getDomainInfo() const
SerialNum
Domain::begin() const
{
- return begin(UniqueLock(_lock));
+ return begin(UniqueLock(_partsMutex));
}
void
Domain::verifyLock(const UniqueLock & guard) const {
- assert(guard.mutex() == &_lock);
+ assert(guard.mutex() == &_partsMutex);
assert(guard.owns_lock());
}
SerialNum
@@ -158,7 +156,7 @@ Domain::begin(const UniqueLock & guard) const
SerialNum
Domain::end() const
{
- return end(UniqueLock(_lock));
+ return end(UniqueLock(_partsMutex));
}
SerialNum
@@ -175,7 +173,7 @@ Domain::end(const UniqueLock & guard) const
size_t
Domain::byteSize() const
{
- return byteSize(UniqueLock(_lock));
+ return byteSize(UniqueLock(_partsMutex));
}
size_t
@@ -194,7 +192,7 @@ SerialNum
Domain::getSynced() const
{
SerialNum s(0);
- UniqueLock guard(_lock);
+ UniqueLock guard(_partsMutex);
if (_parts.empty()) {
return s;
}
@@ -210,35 +208,22 @@ Domain::getSynced() const
void
-Domain::triggerSyncNow(std::unique_ptr<vespalib::Executor::Task> done_sync_task)
+Domain::triggerSyncNow(std::unique_ptr<vespalib::IDestructorCallback> after_sync)
{
{
- std::unique_lock guard(_currentChunkMonitor);
+ std::unique_lock guard(_currentChunkMutex);
commitAndTransferResponses(guard);
}
- std::unique_lock guard(_syncMonitor);
- if (done_sync_task) {
- _done_sync_tasks.push_back(std::move(done_sync_task));
- }
- if (!_pendingSync) {
- _pendingSync = true;
- _singleCommitter->execute(makeLambdaTask([this, domainPart=getActivePart()]() {
- domainPart->sync();
- std::lock_guard monitorGuard(_syncMonitor);
- _pendingSync = false;
- for (auto &task : _done_sync_tasks) {
- auto failed_task = _singleCommitter->execute(std::move(task));
- assert(!failed_task);
- }
- _done_sync_tasks.clear();
- }));
- }
+ _singleCommitter->execute(makeLambdaTask([after_sync=std::move(after_sync), domainPart=getActivePart()]() {
+ (void) after_sync;
+ domainPart->sync();
+ }));
}
DomainPart::SP
Domain::findPart(SerialNum s)
{
- std::lock_guard guard(_lock);
+ std::lock_guard guard(_partsMutex);
DomainPartList::iterator it(_parts.upper_bound(s));
if (!_parts.empty() && it != _parts.begin()) {
DomainPartList::iterator prev(it);
@@ -255,14 +240,14 @@ Domain::findPart(SerialNum s)
DomainPart::SP
Domain::getActivePart() {
- std::lock_guard guard(_lock);
+ std::lock_guard guard(_partsMutex);
return _parts.rbegin()->second;
}
uint64_t
Domain::size() const
{
- return size(UniqueLock(_lock));
+ return size(UniqueLock(_partsMutex));
}
uint64_t
@@ -280,7 +265,7 @@ SerialNum
Domain::findOldestActiveVisit() const
{
SerialNum oldestActive(std::numeric_limits<SerialNum>::max());
- std::lock_guard guard(_sessionLock);
+ std::lock_guard guard(_sessionMutex);
for (const auto & pair : _sessions) {
Session * session(pair.second.get());
if (!session->inSync()) {
@@ -296,7 +281,7 @@ Domain::cleanSessions()
if ( _sessions.empty()) {
return;
}
- std::lock_guard guard(_sessionLock);
+ std::lock_guard guard(_sessionMutex);
for (SessionList::iterator it(_sessions.begin()), mt(_sessions.end()); it != mt; ) {
Session * session(it->second.get());
if (session->inSync()) {
@@ -316,7 +301,7 @@ Domain::optionallyRotateFile(SerialNum serialNum) {
dp->close();
dp = std::make_shared<DomainPart>(_name, dir(), serialNum, _fileHeaderContext, false);
{
- std::lock_guard guard(_lock);
+ std::lock_guard guard(_partsMutex);
_parts[serialNum] = dp;
assert(_parts.rbegin()->first == serialNum);
}
@@ -327,7 +312,7 @@ Domain::optionallyRotateFile(SerialNum serialNum) {
void
Domain::append(const Packet & packet, Writer::DoneCallback onDone) {
- std::unique_lock guard(_currentChunkMonitor);
+ std::unique_lock guard(_currentChunkMutex);
if (_lastSerial >= packet.range().from()) {
throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").",
packet.range().from(), _lastSerial));
@@ -340,7 +325,7 @@ Domain::append(const Packet & packet, Writer::DoneCallback onDone) {
Domain::CommitResult
Domain::startCommit(DoneCallback onDone) {
- std::unique_lock guard(_currentChunkMonitor);
+ std::unique_lock guard(_currentChunkMutex);
if ( !_currentChunk->empty() ) {
auto completed = grabCurrentChunk(guard);
completed->setCommitDoneCallback(std::move(onDone));
@@ -367,7 +352,7 @@ Domain::commitAndTransferResponses(const UniqueLock &guard) {
std::unique_ptr<CommitChunk>
Domain::grabCurrentChunk(const UniqueLock & guard) {
- assert(guard.mutex() == &_currentChunkMonitor && guard.owns_lock());
+ assert(guard.mutex() == &_currentChunkMutex && guard.owns_lock());
auto chunk = std::move(_currentChunk);
_currentChunk = createCommitChunk(_config);
return chunk;
@@ -375,7 +360,7 @@ Domain::grabCurrentChunk(const UniqueLock & guard) {
void
Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const UniqueLock & chunkOrderGuard) {
- assert(chunkOrderGuard.mutex() == &_currentChunkMonitor && chunkOrderGuard.owns_lock());
+ assert(chunkOrderGuard.mutex() == &_currentChunkMutex && chunkOrderGuard.owns_lock());
if (chunk->getPacket().empty()) return;
chunk->shrinkPayloadToFit();
std::promise<SerializedChunk> promise;
@@ -410,7 +395,7 @@ Domain::erase(SerialNum to)
{
bool retval(true);
/// Do not erase the last element
- UniqueLock guard(_lock);
+ UniqueLock guard(_partsMutex);
for (DomainPartList::iterator it(_parts.begin()); (_parts.size() > 1) && (it->second.get()->range().to() < to); it = _parts.begin()) {
DomainPart::SP dp(it->second);
_parts.erase(it);
@@ -433,7 +418,7 @@ Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to, std::uniq
SerialNumRange range(from, to);
auto session = std::make_shared<Session>(_sessionId++, range, domain, std::move(dest));
int id = session->id();
- std::lock_guard guard(_sessionLock);
+ std::lock_guard guard(_sessionMutex);
_sessions[id] = std::move(session);
return id;
}
@@ -442,7 +427,7 @@ int
Domain::startSession(int sessionId)
{
int retval(-1);
- std::lock_guard guard(_sessionLock);
+ std::lock_guard guard(_sessionMutex);
SessionList::iterator found = _sessions.find(sessionId);
if (found != _sessions.end()) {
found->second->setStartTime(vespalib::steady_clock::now());
@@ -461,7 +446,7 @@ Domain::closeSession(int sessionId)
int retval(-1);
DurationSeconds sessionRunTime(0);
{
- std::lock_guard guard(_sessionLock);
+ std::lock_guard guard(_sessionMutex);
SessionList::iterator found = _sessions.find(sessionId);
if (found != _sessions.end()) {
sessionRunTime = (vespalib::steady_clock::now() - found->second->getStartTime());
@@ -470,7 +455,7 @@ Domain::closeSession(int sessionId)
}
while (retval == 1) {
std::this_thread::sleep_for(10ms);
- std::lock_guard guard(_sessionLock);
+ std::lock_guard guard(_sessionMutex);
SessionList::iterator found = _sessions.find(sessionId);
if (found != _sessions.end()) {
if ( ! found->second->isVisitRunning()) {
@@ -482,7 +467,7 @@ Domain::closeSession(int sessionId)
}
}
{
- std::lock_guard guard(_lock);
+ std::lock_guard guard(_partsMutex);
if (sessionRunTime > _maxSessionRunTime) {
_maxSessionRunTime = sessionRunTime;
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index 261a38d8ed4..d7b59d676dd 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -35,7 +35,7 @@ public:
SerialNum begin() const;
SerialNum end() const;
SerialNum getSynced() const;
- void triggerSyncNow(std::unique_ptr<vespalib::Executor::Task> done_sync_task);
+ void triggerSyncNow(std::unique_ptr<vespalib::IDestructorCallback> after_sync);
bool getMarkedDeleted() const { return _markedDeleted; }
void markDeleted() { _markedDeleted = true; }
@@ -80,23 +80,19 @@ private:
using SessionList = std::map<int, std::shared_ptr<Session>>;
using DomainPartList = std::map<SerialNum, DomainPartSP>;
using DurationSeconds = std::chrono::duration<double>;
- using TaskUP = std::unique_ptr<vespalib::Executor::Task>;
+ using Executor = vespalib::Executor;
DomainConfig _config;
std::unique_ptr<CommitChunk> _currentChunk;
SerialNum _lastSerial;
- std::unique_ptr<vespalib::Executor> _singleCommitter;
- vespalib::Executor &_executor;
+ std::unique_ptr<Executor> _singleCommitter;
+ Executor &_executor;
std::atomic<int> _sessionId;
- std::mutex _syncMonitor;
- bool _pendingSync;
- std::vector<TaskUP> _done_sync_tasks;
vespalib::string _name;
DomainPartList _parts;
- mutable std::mutex _lock;
- std::mutex _currentChunkMonitor;
- std::condition_variable _currentChunkCond;
- mutable std::mutex _sessionLock;
+ mutable std::mutex _partsMutex;
+ std::mutex _currentChunkMutex;
+ mutable std::mutex _sessionMutex;
SessionList _sessions;
DurationSeconds _maxSessionRunTime;
vespalib::string _baseDir;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index a0853dcbd86..b5806e1f962 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -9,7 +9,6 @@
#include <vespa/vespalib/util/cpu_usage.h>
#include <vespa/vespalib/util/destructor_callbacks.h>
#include <vespa/vespalib/util/exceptions.h>
-#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <fstream>
@@ -42,7 +41,7 @@ public:
SyncHandler(std::atomic<bool>& closed, FRT_RPCRequest *req, Domain::SP domain, SerialNum syncTo) noexcept;
~SyncHandler();
- void poll();
+ bool poll();
};
SyncHandler::SyncHandler(std::atomic<bool>& closed, FRT_RPCRequest *req, Domain::SP domain, SerialNum syncTo) noexcept
@@ -55,7 +54,7 @@ SyncHandler::SyncHandler(std::atomic<bool>& closed, FRT_RPCRequest *req, Domain:
SyncHandler::~SyncHandler() = default;
-void
+bool
SyncHandler::poll()
{
SerialNum synced(_domain->getSynced());
@@ -67,9 +66,13 @@ SyncHandler::poll()
rvals.AddInt32(0);
rvals.AddInt64(synced);
_req.Return();
- } else {
- _domain->triggerSyncNow(vespalib::makeLambdaTask([self = shared_from_this()]() { self->poll(); }));
+ return true;
}
+ _domain->triggerSyncNow(vespalib::makeUniqueLambdaCallback([self = shared_from_this()]() {
+ bool completed = self->poll();
+ assert(completed);
+ }));
+ return false;
}
VESPA_THREAD_STACK_TAG(tls_executor);