diff options
Diffstat (limited to 'searchlib/src')
-rw-r--r-- | searchlib/src/vespa/searchlib/transactionlog/domain.cpp | 66 | ||||
-rw-r--r-- | searchlib/src/vespa/searchlib/transactionlog/domain.h | 5 |
2 files changed, 66 insertions, 5 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 9e0f1a8a1aa..bd7feec0598 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -113,7 +113,12 @@ Domain::addPart(SerialNum partId, bool isLastPart) { } } -Domain::~Domain() { } +Domain::~Domain() { + MonitorGuard guard(_currentChunkMonitor); + guard.broadcast(); + commitChunk(grabCurrentChunk(guard), guard); + _singleCommitter->shutdown().sync(); +} DomainInfo Domain::getDomainInfo() const @@ -318,22 +323,73 @@ Domain::optionallyRotateFile(SerialNum serialNum) { return dp; } +void +Domain::append(const Packet & packet, Writer::DoneCallback onDone) { + vespalib::MonitorGuard guard(_currentChunkMonitor); + if (_lastSerial >= packet.range().from()) { + throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").", + packet.range().from(), _lastSerial)); + } else { + _lastSerial = packet.range().to(); + } + _currentChunk->add(packet, std::move(onDone)); + commitIfFull(guard); +} + Domain::CommitResult Domain::startCommit(DoneCallback onDone) { - (void) onDone; + vespalib::MonitorGuard guard(_currentChunkMonitor); + if ( !_currentChunk->empty() ) { + auto completed = grabCurrentChunk(guard); + completed->setCommitDoneCallback(std::move(onDone)); + CommitResult result(completed->createCommitResult()); + commitChunk(std::move(completed), guard); + return result; + } return CommitResult(); } void -Domain::append(const Packet & packet, Writer::DoneCallback onDone) -{ - (void) onDone; +Domain::commitIfFull(const vespalib::MonitorGuard &guard) { + if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) { + auto completed = std::move(_currentChunk); + _currentChunk = std::make_unique<CommitChunk>(_config.getChunkSizeLimit(), completed->stealCallbacks()); + commitChunk(std::move(completed), guard); + } +} + +std::unique_ptr<CommitChunk> +Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) { + assert(guard.monitors(_currentChunkMonitor)); + auto chunk = std::move(_currentChunk); + _currentChunk = createCommitChunk(_config); + return chunk; +} + +void +Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) { + assert(chunkOrderGuard.monitors(_currentChunkMonitor)); + _singleCommitter->execute( makeLambdaTask([this, chunk = std::move(chunk)]() mutable { + doCommit(std::move(chunk)); + })); +} + +void +Domain::doCommit(std::unique_ptr<CommitChunk> chunk) { + const Packet & packet = chunk->getPacket(); + if (packet.empty()) return; + vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size()); Packet::Entry entry; entry.deserialize(is); DomainPart::SP dp = optionallyRotateFile(entry.serial()); dp->commit(entry.serial(), packet); + if (_config.getFSyncOnCommit()) { + dp->sync(); + } cleanSessions(); + LOG(debug, "Releasing %zu acks and %zu entries and %zu bytes.", + chunk->getNumCallBacks(), chunk->getPacket().size(), chunk->sizeBytes()); } bool diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index 7e77e6ef0ef..041ec27cf23 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -56,6 +56,11 @@ public: uint64_t size() const; Domain & setConfig(const DomainConfig & cfg); private: + void commitIfFull(const vespalib::MonitorGuard & guard); + + std::unique_ptr<CommitChunk> grabCurrentChunk(const vespalib::MonitorGuard & guard); + void commitChunk(std::unique_ptr<CommitChunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard); + void doCommit(std::unique_ptr<CommitChunk> chunk); SerialNum begin(const vespalib::LockGuard & guard) const; SerialNum end(const vespalib::LockGuard & guard) const; size_t byteSize(const vespalib::LockGuard & guard) const; |