diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-21 17:22:29 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2018-01-10 09:56:24 +0100 |
commit | 9c78dc6483c5998dcda7a34315dbf1c4504ac04f (patch) | |
tree | f33c7d6f83a3d269c63d2b06afe5fd5a33fec5e5 /searchlib | |
parent | 4c8c90f9ef909041e0066ac9b4ada817c495c5ba (diff) |
Also use encode for write path
Diffstat (limited to 'searchlib')
6 files changed, 70 insertions, 43 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp index 4c34aed74c0..03c04f5c867 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp @@ -37,25 +37,33 @@ void verifyCrc(nbostream & is, Encoding::Crc crcType) { } -void CCITTCRC32None::onEncode(nbostream &os) { - (void) os; +void CCITTCRC32None::onEncode(nbostream &os) const { + size_t start = os.wp(); + assert(getEntries().size() == 1); + serializeEntries(os); + int32_t crc = Encoding::calcCrc(Encoding::Crc::ccitt_crc32, os.c_str()+start, os.size() - start); + os << crc; } void CCITTCRC32None::onDecode(nbostream &is) { verifyCrc(is, Encoding::Crc::ccitt_crc32); nbostream data(is.peek(), is.size() - sizeof(int32_t)); - add(data); + deserializeEntries(data); is.adjustReadPos(is.size()); } -void XXH64None::onEncode(nbostream &os) { - (void) os; +void XXH64None::onEncode(nbostream &os) const { + size_t start = os.wp(); + assert(getEntries().size() == 1); + serializeEntries(os); + int32_t crc = Encoding::calcCrc(Encoding::Crc::xxh64, os.c_str()+start, os.size() - start); + os << crc; } void XXH64None::onDecode(nbostream &is) { verifyCrc(is, Encoding::Crc::xxh64); nbostream data(is.peek(), is.size() - sizeof(int32_t)); - add(data); + deserializeEntries(data); is.adjustReadPos(is.size()); } @@ -67,11 +75,11 @@ XXH64Compressed::decompress(nbostream & is, vespalib::compression::CompressionCo ConstBufferRef compressed(is.peek(), is.size()-sizeof(uint32_t)*2); ::decompress(type, uncompressedLen, compressed, uncompressed, false); nbostream data(uncompressed.getData(), uncompressed.getDataLen()); - add(data); + deserializeEntries(data); is.adjustReadPos(is.size()); } -void XXH64LZ4::onEncode(IChunk::nbostream &os) { +void XXH64LZ4::onEncode(IChunk::nbostream &os) const { (void) os; } @@ -81,7 +89,7 @@ void XXH64LZ4::onDecode(IChunk::nbostream &is) { decompress(is, CompressionConfig::LZ4); } -void XXH64ZSTD::onEncode(IChunk::nbostream &os) { +void XXH64ZSTD::onEncode(IChunk::nbostream &os) const { (void) os; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.h b/searchlib/src/vespa/searchlib/transactionlog/chunks.h index 49a93244cce..ceafe9428da 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/chunks.h +++ b/searchlib/src/vespa/searchlib/transactionlog/chunks.h @@ -9,14 +9,14 @@ namespace search::transactionlog { class XXH64None : public IChunk { protected: - void onEncode(nbostream &os) override; + void onEncode(nbostream &os) const override; void onDecode(nbostream &is) override; public: }; class CCITTCRC32None : public IChunk { protected: - void onEncode(nbostream &os) override; + void onEncode(nbostream &os) const override; void onDecode(nbostream &is) override; public: }; @@ -31,13 +31,13 @@ private: }; class XXH64LZ4 : public XXH64Compressed { protected: - void onEncode(nbostream &os) override; + void onEncode(nbostream &os) const override; void onDecode(nbostream &is) override; }; class XXH64ZSTD : public XXH64Compressed { protected: - void onEncode(nbostream &os) override; + void onEncode(nbostream &os) const override; void onDecode(nbostream &is) override; }; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp index bc85e6b0d78..16d10b8183e 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp @@ -30,7 +30,7 @@ constexpr size_t TARGET_PACKET_SIZE = 0x3f000; string handleWriteError(const char *text, FastOS_FileInterface &file, int64_t lastKnownGoodPos, - const Packet::Entry &entry, int bufLen) __attribute__ ((noinline)); + SerialNumRange range, int bufLen) __attribute__ ((noinline)); bool handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize_t rlen, @@ -61,17 +61,17 @@ handleSync(FastOS_FileInterface &file) string handleWriteError(const char *text, FastOS_FileInterface &file, int64_t lastKnownGoodPos, - const Packet::Entry &entry, int bufLen) + SerialNumRange range, int bufLen) { string last(FastOS_File::getLastErrorString()); - string e(make_string("%s. File '%s' at position %" PRId64 " for entry %" PRIu64 " of length %u. " - "OS says '%s'. Rewind to last known good position %" PRId64 ".", - text, file.GetFileName(), file.GetPosition(), entry.serial(), bufLen, + string e(make_string("%s. File '%s' at position %" PRId64 " for entries [%zu, %zu] of length %u. " + "OS says '%s'. Rewind to last known good position %zu.", + text, file.GetFileName(), file.GetPosition(), range.from(), range.to(), bufLen, last.c_str(), lastKnownGoodPos)); LOG(error, "%s", e.c_str()); if ( ! file.SetPosition(lastKnownGoodPos) ) { last = FastOS_File::getLastErrorString(); - throw runtime_error(make_string("Failed setting position %" PRId64 " of file '%s' of size %" PRId64 ": OS says '%s'", + throw runtime_error(make_string("Failed setting position %zu of file '%s' of size %zd : OS says '%s'", lastKnownGoodPos, file.GetFileName(), file.GetSize(), last.c_str())); } handleSync(file); @@ -397,10 +397,12 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) //LOG(spam, //"Pos(%d) Len(%d), Lim(%d), Remaining(%d)", //h.getPos(), h.getLength(), h.getLimit(), h.getRemaining()); + IChunk::UP chunk = IChunk::create(_defaultEncoding); Packet::Entry entry; entry.deserialize(h); if (_range.to() < entry.serial()) { - write(*_transLog, entry); + chunk->add(entry); + write(*_transLog, *chunk); _sz++; _range.to(entry.serial()); } else { @@ -529,28 +531,25 @@ DomainPart::visit(FastOS_FileInterface &file, SerialNumRange &r, Packet &packet) } void -DomainPart::write(FastOS_FileInterface &file, const Packet::Entry &entry) +DomainPart::write(FastOS_FileInterface &file, const IChunk & chunk) { - int64_t lastKnownGoodPos(file.GetPosition()); - int32_t crc(0); - uint32_t len(entry.serializedSize() + sizeof(crc)); nbostream os; os << _defaultEncoding.getRaw(); - os << len; - size_t start(os.size()); - entry.serialize(os); - size_t end(os.size()); - crc = Encoding::calcCrc(_defaultEncoding.getCrc(), os.c_str()+start, end - start); - os << crc; - size_t osSize = os.size(); - assert(osSize == len + sizeof(len) + sizeof(uint8_t)); + size_t sizePos = os.wp(); + os << uint32_t(0); + chunk.encode(os); + size_t end = os.wp(); + os.wp(sizePos); + os << uint32_t(end - (sizePos + sizeof(uint32_t))); + os.wp(end); + int64_t lastKnownGoodPos(file.GetPosition()); LockGuard guard(_writeLock); - if ( ! file.CheckedWrite(os.c_str(), osSize) ) { - throw runtime_error(handleWriteError("Failed writing the entry.", file, lastKnownGoodPos, entry, end - start)); + if ( ! file.CheckedWrite(os.c_str(), os.size()) ) { + throw runtime_error(handleWriteError("Failed writing the entry.", file, lastKnownGoodPos, chunk.range(), os.size())); } - _writtenSerial = entry.serial(); - _byteSize.store(lastKnownGoodPos + osSize, std::memory_order_release); + _writtenSerial = chunk.range().to(); + _byteSize.store(lastKnownGoodPos + os.size(), std::memory_order_release); } bool diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h index b62063d4466..9f92c49eaf5 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h @@ -52,7 +52,7 @@ private: static Packet readPacket(FastOS_FileInterface & file, SerialNumRange wanted, size_t targetSize, bool allowTruncate); static bool read(FastOS_FileInterface &file, IChunk::UP & chunk, Alloc &buf, bool allowTruncate); - void write(FastOS_FileInterface &file, const Packet::Entry &entry); + void write(FastOS_FileInterface &file, const IChunk & entry); void writeHeader(const common::FileHeaderContext &fileHeaderContext); class SkipInfo diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp index f1822ead5c9..1f32041a68d 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp @@ -27,8 +27,15 @@ IChunk::add(const Packet::Entry & entry) { _entries.emplace_back(entry); } +SerialNumRange +IChunk::range() const { + return _entries.empty() + ? SerialNumRange() + : SerialNumRange(_entries.front().serial(), _entries.back().serial()); +} + void -IChunk::add(nbostream & is) { +IChunk::deserializeEntries(nbostream & is) { while (is.good() && !is.empty()) { Packet::Entry e; e.deserialize(is); @@ -37,8 +44,15 @@ IChunk::add(nbostream & is) { } void -IChunk::encode(nbostream & ) { +IChunk::serializeEntries(nbostream & os) const { + for (const auto & e : _entries) { + e.serialize(os); + } +} +void +IChunk::encode(nbostream & os) const { + onEncode(os); } void @@ -48,7 +62,10 @@ IChunk::decode(nbostream & is) { IChunk::UP IChunk::create(uint8_t chunkType) { - Encoding encoding(chunkType); + return create(Encoding(chunkType)); +} +IChunk::UP +IChunk::create(Encoding encoding) { switch (encoding.getCrc()) { case Encoding::Crc::xxh64: switch (encoding.getCompression()) { diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h index cad1cc27495..4f68c51e4c1 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h @@ -40,13 +40,16 @@ public: virtual ~IChunk(); const Entries & getEntries() const { return _entries; } void add(const Packet::Entry & entry); - void encode(nbostream & os); + void encode(nbostream & os) const; void decode(nbostream & buf); static UP create(uint8_t chunkType); + static UP create(Encoding chunkType); + SerialNumRange range() const; protected: - virtual void onEncode(nbostream & os) = 0; + virtual void onEncode(nbostream & os) const = 0; virtual void onDecode(nbostream & is) = 0; - void add(nbostream & is); + void deserializeEntries(nbostream & is); + void serializeEntries(nbostream & os) const; private: Entries _entries; }; |