diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-12-06 13:23:50 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-12-06 13:27:42 +0000 |
commit | 65c26f63261639575911dcda9a5d43dfcf7ebb04 (patch) | |
tree | 6f80c844486ab92802f8ee032448b3e5ae550d3d /searchlib | |
parent | 8072a49a023e19d5bf1246c5ce46205d2bc10631 (diff) |
- Separate encoding and actual persitence of the transaction log.
- Refactor in preparation of multithreaded encode.
Diffstat (limited to 'searchlib')
5 files changed, 98 insertions, 64 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 24943b53e6d..7fa87f3ec09 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -56,11 +56,13 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & exec _fileHeaderContext(fileHeaderContext), _markedDeleted(false) { - int retval(0); - if ((retval = makeDirectory(_baseDir.c_str())) != 0) { + assert(_config.getEncoding().getCompression() != Encoding::Compression::none); + int retval = makeDirectory(_baseDir.c_str()); + if (retval != 0) { throw runtime_error(fmt("Failed creating basedirectory %s r(%d), e(%d)", _baseDir.c_str(), retval, errno)); } - if ((retval = makeDirectory(dir().c_str())) != 0) { + retval = makeDirectory(dir().c_str()); + if (retval != 0) { throw runtime_error(fmt("Failed creating domaindir %s r(%d), e(%d)", dir().c_str(), retval, errno)); } SerialNumList partIdVector = scanDir(); @@ -76,8 +78,7 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & exec } pending.waitForZeroRefCount(); if (_parts.empty() || _parts.crbegin()->second->isClosed()) { - _parts[lastPart] = std::make_shared<DomainPart>(_name, dir(), lastPart, _config.getEncoding(), - _config.getCompressionlevel(), _fileHeaderContext, false); + _parts[lastPart] = std::make_shared<DomainPart>(_name, dir(), lastPart, _fileHeaderContext, false); vespalib::File::sync(dir()); } _lastSerial = end(); @@ -86,13 +87,13 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & exec Domain & Domain::setConfig(const DomainConfig & cfg) { _config = cfg; + assert(_config.getEncoding().getCompression() != Encoding::Compression::none); return *this; } void Domain::addPart(SerialNum partId, bool isLastPart) { - auto dp = std::make_shared<DomainPart>(_name, dir(), partId, _config.getEncoding(), - _config.getCompressionlevel(), _fileHeaderContext, isLastPart); + auto dp = std::make_shared<DomainPart>(_name, dir(), partId, _fileHeaderContext, isLastPart); if (dp->size() == 0) { // Only last domain part is allowed to be truncated down to // empty size. @@ -331,8 +332,7 @@ Domain::optionallyRotateFile(SerialNum serialNum) { triggerSyncNow({}); waitPendingSync(_syncMonitor, _syncCond, _pendingSync); dp->close(); - dp = std::make_shared<DomainPart>(_name, dir(), serialNum, _config.getEncoding(), - _config.getCompressionlevel(), _fileHeaderContext, false); + dp = std::make_shared<DomainPart>(_name, dir(), serialNum, _fileHeaderContext, false); { std::lock_guard guard(_lock); _parts[serialNum] = dp; @@ -399,14 +399,16 @@ Domain::commitChunk(std::unique_ptr<CommitChunk> chunk, const UniqueLock & chunk })); } + + void Domain::doCommit(std::unique_ptr<CommitChunk> chunk) { const Packet & packet = chunk->getPacket(); if (packet.empty()) return; - SerialNum firstSerial = packet.range().from(); - DomainPart::SP dp = optionallyRotateFile(firstSerial); - dp->commit(firstSerial, packet); + SerializedChunk serialized(packet, _config.getEncoding(), _config.getCompressionlevel()); + DomainPart::SP dp = optionallyRotateFile(packet.range().from()); + dp->commit(serialized); if (_config.getFSyncOnCommit()) { dp->sync(); } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp index 3dad67df177..2ca2f15545d 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp @@ -247,11 +247,9 @@ DomainPart::buildPacketMapping(bool allowTruncate) return currPos; } -DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding, - uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate) - : _encoding(encoding), - _compressionLevel(compressionLevel), - _lock(), +DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, + const FileHeaderContext &fileHeaderContext, bool allowTruncate) + : _lock(), _fileLock(), _range(s), _sz(0), @@ -379,35 +377,21 @@ DomainPart::erase(SerialNum to) } void -DomainPart::commit(SerialNum firstSerial, const Packet &packet) +DomainPart::commit(const SerializedChunk & serialized) { + SerialNumRange range = serialized.range(); + int64_t firstPos(byteSize()); - nbostream_longlivedbuf h(packet.getHandle().data(), packet.getHandle().size()); + assert(_range.to() < range.to()); + _sz += serialized.getNumEntries(); + _range.to(range.to()); if (_range.from() == 0) { - _range.from(firstSerial); - } - IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel); - for (size_t i(0); h.size() > 0; i++) { - //LOG(spam, - //"Pos(%d) Len(%d), Lim(%d), Remaining(%d)", - //h.getPos(), h.getLength(), h.getLimit(), h.getRemaining()); - Packet::Entry entry; - entry.deserialize(h); - if (_range.to() < entry.serial()) { - chunk->add(entry); - assert(_encoding.getCompression() != Encoding::Compression::none); - _sz++; - _range.to(entry.serial()); - } else { - throw runtime_error(fmt("Incoming serial number(%" PRIu64 ") must be bigger than the last one (%" PRIu64 ").", - entry.serial(), _range.to())); - } - } - if ( ! chunk->getEntries().empty()) { - write(*_transLog, *chunk); + _range.from(range.from()); } + + write(*_transLog, range, serialized.getData()); std::lock_guard guard(_lock); - _skipList.emplace_back(firstSerial, firstPos); + _skipList.emplace_back(range.from(), firstPos); } void @@ -442,26 +426,15 @@ DomainPart::visit(FastOS_FileInterface &file, SerialNumRange &r, Packet &packet) } void -DomainPart::write(FastOS_FileInterface &file, const IChunk & chunk) +DomainPart::write(FastOS_FileInterface &file, SerialNumRange range, vespalib::ConstBufferRef buf) { - nbostream os; - size_t begin = os.wp(); - os << _encoding.getRaw(); // Placeholder for encoding - os << uint32_t(0); // Placeholder for size - Encoding realEncoding = chunk.encode(os); - size_t end = os.wp(); - os.wp(0); - os << realEncoding.getRaw(); //Patching real encoding - os << uint32_t(end - (begin + sizeof(uint32_t) + sizeof(uint8_t))); // Patching actual size. - os.wp(end); std::lock_guard guard(_writeLock); - if ( ! file.CheckedWrite(os.data(), os.size()) ) { - throw runtime_error(handleWriteError("Failed writing the entry.", file, byteSize(), chunk.range(), os.size())); + if ( ! file.CheckedWrite(buf.data(), buf.size()) ) { + throw runtime_error(handleWriteError("Failed writing the entry.", file, byteSize(), range, buf.size())); } - LOG(debug, "Wrote chunk with %zu entries and %zu bytes, range[%" PRIu64 ", %" PRIu64 "] encoding(wanted=%x, real=%x)", - chunk.getEntries().size(), os.size(), chunk.range().from(), chunk.range().to(), _encoding.getRaw(), realEncoding.getRaw()); - _writtenSerial = chunk.range().to(); - _byteSize.fetch_add(os.size(), std::memory_order_release); + LOG(debug, "Wrote chunk with and %zu bytes, range[%" PRIu64 ", %" PRIu64 "]", buf.size(), range.from(), range.to()); + _writtenSerial = range.to(); + _byteSize.fetch_add(buf.size(), std::memory_order_release); } bool diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h index 9ab0db54391..ea5290c433b 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h @@ -19,13 +19,13 @@ public: using SP = std::shared_ptr<DomainPart>; DomainPart(const DomainPart &) = delete; DomainPart& operator=(const DomainPart &) = delete; - DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Encoding defaultEncoding, - uint8_t compressionLevel, const common::FileHeaderContext &FileHeaderContext, bool allowTruncate); + DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, + const common::FileHeaderContext &FileHeaderContext, bool allowTruncate); ~DomainPart(); const vespalib::string &fileName() const { return _fileName; } - void commit(SerialNum firstSerial, const Packet &packet); + void commit(const SerializedChunk & serialized); bool erase(SerialNum to); bool visit(FastOS_FileInterface &file, SerialNumRange &r, Packet &packet); bool close(); @@ -49,7 +49,7 @@ private: static Packet readPacket(FastOS_FileInterface & file, SerialNumRange wanted, size_t targetSize, bool allowTruncate); static bool read(FastOS_FileInterface &file, IChunk::UP & chunk, Alloc &buf, bool allowTruncate); - void write(FastOS_FileInterface &file, const IChunk & entry); + void write(FastOS_FileInterface &file, SerialNumRange range, vespalib::ConstBufferRef buf); void writeHeader(const common::FileHeaderContext &fileHeaderContext); class SkipInfo @@ -69,8 +69,6 @@ private: SerialNum _id; uint64_t _pos; }; - const Encoding _encoding; - const uint8_t _compressionLevel; std::mutex _lock; std::mutex _fileLock; SerialNumRange _range; diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp index ee1631ea8c2..e3d98cd576d 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp @@ -8,6 +8,9 @@ #include <cassert> #include <ostream> +#include <vespa/log/log.h> +LOG_SETUP(".searchlib.transactionlog.ichunk"); + using std::make_unique; using vespalib::make_string_short::fmt; using vespalib::nbostream_longlivedbuf; @@ -115,4 +118,46 @@ std::ostream & operator << (std::ostream & os, Encoding e) { return os << "crc=" << e.getCrc() << " compression=" << e.getCompression(); } + +void +encode(vespalib::nbostream & os, const IChunk & chunk, Encoding encoding) { + size_t begin = os.wp(); + os << encoding.getRaw(); // Placeholder for encoding + os << uint32_t(0); // Placeholder for size + Encoding realEncoding = chunk.encode(os); + size_t end = os.wp(); + os.wp(0); + os << realEncoding.getRaw(); //Patching real encoding + os << uint32_t(end - (begin + sizeof(uint32_t) + sizeof(uint8_t))); // Patching actual size. + os.wp(end); + SerialNumRange range = chunk.range(); + LOG(spam, "Encoded chunk with %zu entries and %zu bytes, range[%" PRIu64 ", %" PRIu64 "] encoding(wanted=%x, real=%x)", + chunk.getEntries().size(), os.size(), range.from(), range.to(), encoding.getRaw(), realEncoding.getRaw()); +} + +SerializedChunk::SerializedChunk(const Packet & packet, Encoding encoding, uint8_t compressionLevel) + : _os(), + _range(packet.range()), + _numEntries(packet.size()) +{ + nbostream_longlivedbuf h(packet.getHandle().data(), packet.getHandle().size()); + + IChunk::UP chunk = IChunk::create(encoding, compressionLevel); + SerialNum prev = 0; + for (size_t i(0); h.size() > 0; i++) { + //LOG(spam, + //"Pos(%d) Len(%d), Lim(%d), Remaining(%d)", + //h.getPos(), h.getLength(), h.getLimit(), h.getRemaining()); + Packet::Entry entry; + entry.deserialize(h); + assert (prev < entry.serial()); + chunk->add(entry); + prev = entry.serial(); + } + assert(! chunk->getEntries().empty()); + encode(_os, *chunk, encoding); +} +vespalib::ConstBufferRef SerializedChunk::getData() const { + return vespalib::ConstBufferRef(_os.data(), _os.size()); +} } diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h index 02bd0ce9426..dccfd6617f5 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h @@ -33,6 +33,22 @@ private: std::ostream & operator << (std::ostream & os, Encoding e); /** + * Represents a completely encoded chunk with a buffer ready to be persisted, + * and the range and number of entries it covers. + */ +class SerializedChunk { +public: + SerializedChunk(const Packet & packet, Encoding encoding, uint8_t compressionLevel); + vespalib::ConstBufferRef getData() const; + SerialNumRange range() const { return _range; } + size_t getNumEntries() const { return _numEntries; } +private: + vespalib::nbostream _os; + SerialNumRange _range; + size_t _numEntries; +}; + +/** * Interface for different chunk formats. * Format specifies both crc type, and compression type. */ |