diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-04 12:42:59 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-04 12:42:59 +0200 |
commit | cdf17e68e3e9df196705ebe80b5002f6e23fca19 (patch) | |
tree | 19cabca511b8d2c60d0a7a325eacb6d500bffa1c | |
parent | 503b0ff5037f0db031b3410899e1f9cccb23bd0a (diff) | |
parent | c9f9adce2b5352c2b5d4653d1e08bde9e7f707b7 (diff) |
Merge pull request #14284 from vespa-engine/balder/mandatory-error-check-in-add
Balder/mandatory error check in add
5 files changed, 135 insertions, 185 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index d003bad0582..b7eb56d1fd9 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -251,7 +251,7 @@ fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name) Packet b(DEFAULT_PACKET_SIZE); b.add(e2); b.add(e3); - EXPECT_FALSE(b.add(e1)); + EXPECT_EXCEPTION(b.add(e1), std::runtime_error, ""); ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size()))); ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(b.getHandle().data(), b.getHandle().size()))); EXPECT_EXCEPTION(s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size())), diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.cpp b/searchlib/src/vespa/searchlib/transactionlog/common.cpp index 40a065277be..afaaa349b64 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/common.cpp @@ -9,7 +9,7 @@ namespace search::transactionlog { using vespalib::nbostream; using vespalib::nbostream_longlivedbuf; -using vespalib::make_string; +using vespalib::make_string_short::fmt; using std::runtime_error; namespace { @@ -18,7 +18,7 @@ void throwRangeError(SerialNum prev, SerialNum next) __attribute__((noinline)); void throwRangeError(SerialNum prev, SerialNum next) { - throw runtime_error(make_string("The new serialnum %" PRIu64 " is not higher than the old one %" PRIu64 "", next, prev)); + throw runtime_error(fmt("The new serialnum %" PRIu64 " is not higher than the old one %" PRIu64 "", next, prev)); } } @@ -51,7 +51,6 @@ SerialNumRange::cmp(const SerialNumRange & b) const Packet::Packet(const void * buf, size_t sz) : _count(0), _range(), - _limit(sz), _buf(static_cast<const char *>(buf), sz) { nbostream_longlivedbuf os(_buf.data(), sz); @@ -105,22 +104,21 @@ Packet::Entry::Entry(SerialNum u, Type t, const vespalib::ConstBufferRef & d) : _type(t), _valid(true), _data(d) -{ -} - +{ } -bool Packet::add(const Packet::Entry & e) +void +Packet::add(const Packet::Entry & e) { - bool retval((_buf.size() < _limit) && (_range.to() < e.serial())); - if (retval) { - if (_buf.empty()) { - _range.from(e.serial()); - } - e.serialize(_buf); - _count++; - _range.to(e.serial()); + if (_range.to() >= e.serial()) { + throwRangeError(_range.to(), e.serial()); } - return retval; + + if (_buf.empty()) { + _range.from(e.serial()); + } + e.serialize(_buf); + _count++; + _range.to(e.serial()); } } diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h index 045bc251e24..0deceb2668a 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.h +++ b/searchlib/src/vespa/searchlib/transactionlog/common.h @@ -69,9 +69,9 @@ public: vespalib::ConstBufferRef _data; }; public: - Packet(size_t m=0xf000) : _count(0), _range(), _limit(m), _buf(m) { } + Packet(size_t reserved) : _count(0), _range(), _buf(reserved) { } Packet(const void * buf, size_t sz); - bool add(const Entry & data); + void add(const Entry & data); void clear() { _buf.clear(); _count = 0; _range.from(0); _range.to(0); } const SerialNumRange & range() const { return _range; } const vespalib::nbostream & getHandle() const { return _buf; } @@ -82,7 +82,6 @@ public: private: size_t _count; SerialNumRange _range; - size_t _limit; vespalib::nbostream_longlivedbuf _buf; }; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp index db26c0c1ee8..17a12ac717a 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp @@ -1,12 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "domainpart.h" -#include <vespa/vespalib/util/crc.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/data/fileheader.h> #include <vespa/searchlib/common/fileheadercontext.h> #include <vespa/fastlib/io/bufferedfile.h> -#include <xxhash.h> #include <vespa/log/log.h> LOG_SETUP(".transactionlog.domainpart"); @@ -38,15 +36,15 @@ handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize int64_t lastKnownGoodPos, bool allowTruncate) __attribute__ ((noinline)); void handleSync(FastOS_FileInterface &file) __attribute__ ((noinline)); -bool addPacket(Packet &packet, const Packet::Entry &e) __attribute__ ((noinline)); +void addPacket(Packet &packet, const Packet::Entry &e) __attribute__ ((noinline)); bool tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) __attribute__ ((noinline)); -bool +void addPacket(Packet &packet, const Packet::Entry &e) { LOG(spam, "Adding serial #%" PRIu64 ", of type %d and size %zd into packet of size %zu and %zu bytes", e.serial(), e.type(), e.data().size(), packet.size(), packet.sizeBytes()); - return ! packet.add(e); + packet.add(e); } void @@ -158,20 +156,43 @@ handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize return retval; } -int32_t -calcCrc(Encoding::Crc version, const void * buf, size_t sz) -{ - if (version == Encoding::Crc::xxh64) { - return static_cast<int32_t>(XXH64(buf, sz, 0ll)); - } else if (version == Encoding::Crc::ccitt_crc32) { - vespalib::crc_32_type calculator; - calculator.process_bytes(buf, sz); - return calculator.checksum(); - } else { - LOG_ABORT("should not be reached"); - } } +Packet +DomainPart::readPacket(FastOS_FileInterface & transLog, SerialNumRange wanted, size_t targetSize, bool allowTruncate) { + Alloc buf; + Packet packet(targetSize); + int64_t fSize(transLog.GetSize()); + int64_t currPos(transLog.GetPosition()); + for(size_t i(0); (packet.sizeBytes() < targetSize) && (currPos < fSize) && (packet.range().to() < wanted.to()); i++) { + IChunk::UP chunk; + if (read(transLog, chunk, buf, allowTruncate)) { + if (chunk) { + try { + for (const Packet::Entry & e : chunk->getEntries()) { + if ((wanted.from() < e.serial()) && (e.serial() <= wanted.to())) { + addPacket(packet, e); + } + } + } catch (const std::exception & ex) { + throw runtime_error(make_string("%s : Failed creating packet for list %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", + ex.what(), transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); + } + } else { + throw runtime_error(make_string("Invalid entry reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", + transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); + } + } else { + if (transLog.GetSize() != fSize) { + fSize = transLog.GetSize(); + } else { + throw runtime_error(make_string("Failed reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", + transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); + } + } + currPos = transLog.GetPosition(); + } + return packet; } int64_t @@ -204,57 +225,26 @@ DomainPart::buildPacketMapping(bool allowTruncate) handleReadError("file header", transLog, 0, FileHeader::getMinSize(), 0, allowTruncate); } } + const SerialNumRange all(0, std::numeric_limits<SerialNum>::max()); while ((currPos < fSize)) { - Packet packet; - SerialNum firstSerial(0); - SerialNum lastSerial(0); - int64_t firstPos(currPos); - bool full(false); - Alloc buf; - for(size_t i(0); !full && (currPos < fSize); i++) { - Packet::Entry e; - if (read(transLog, e, buf, allowTruncate)) { - if (e.valid()) { - if (i == 0) { - firstSerial = e.serial(); - if (currPos == _headerLen) { - _range.from(firstSerial); - } - } - try { - full = addPacket(packet, e); - if ( ! full ) { - lastSerial = e.serial(); - currPos = transLog.GetPosition(); - _sz++; - } else { - transLog.SetPosition(currPos); - } - } catch (const std::exception & ex) { - throw runtime_error(make_string("%s : Failed creating packet for list %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", - ex.what(), transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); - } - } else { - throw runtime_error(make_string("Invalid entry reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", - transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); - } - } else { - if (transLog.GetSize() != fSize) { - fSize = transLog.GetSize(); - } else { - throw runtime_error(make_string("Failed reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", - transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); - } - } - } + const int64_t firstPos(currPos); + Packet packet = readPacket(transLog, all, TARGET_PACKET_SIZE, allowTruncate); if (!packet.empty()) { - _packets[firstSerial] = packet; - _range.to(lastSerial); + _sz += packet.size(); + const SerialNum firstSerial = packet.range().from(); + if (currPos == _headerLen) { + _range.from(firstSerial); + } + _range.to(packet.range().to()); + _packets.insert(std::make_pair(firstSerial, std::move(packet))); { LockGuard guard(_lock); _skipList.push_back(SkipInfo(firstSerial, firstPos)); } + } else { + fSize = transLog.GetSize(); } + currPos = transLog.GetPosition(); } transLog.Close(); return currPos; @@ -409,10 +399,12 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) //LOG(spam, //"Pos(%d) Len(%d), Lim(%d), Remaining(%d)", //h.getPos(), h.getLength(), h.getLimit(), h.getRemaining()); + IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel); Packet::Entry entry; entry.deserialize(h); if (_range.to() < entry.serial()) { - write(*_transLog, entry); + chunk->add(entry); + write(*_transLog, *chunk); _sz++; _range.to(entry.serial()); } else { @@ -431,7 +423,7 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) } } if (! merged ) { - _packets[firstSerial] = packet; + _packets.insert(std::make_pair(firstSerial, std::move(packet))); _skipList.push_back(SkipInfo(firstSerial, firstPos)); } } @@ -501,18 +493,15 @@ DomainPart::visit(SerialNumRange &r, Packet &packet) if (e.serial() <= r.to()) { LOG(spam, "Adding serial #%" PRIu64 ", of type %d and size %zd into packet of size %zu and %zu bytes", e.serial(), e.type(), e.data().size(), newPacket.size(), newPacket.sizeBytes()); - if (newPacket.add(e)) { - r.from(e.serial()); - } else { - throw runtime_error("Could not add entry to packet. Here is some mumbo jumbo. Fix."); - } + newPacket.add(e); + r.from(e.serial()); } else { // Force breakout on visiting empty interval. r.from(r.to()); } } } - packet = newPacket; + packet = std::move(newPacket); retval = next != _packets.end(); } } @@ -527,121 +516,84 @@ DomainPart::visit(SerialNumRange &r, Packet &packet) bool DomainPart::visit(FastOS_FileInterface &file, SerialNumRange &r, Packet &packet) { - bool retval(true); - if ( ! file.IsOpened() ) { - retval = openAndFind(file, r.from() + 1); + if ( ! file.IsOpened() && ! openAndFind(file, r.from() + 1)) { + return false; } - if (retval) { - Packet newPacket; - Alloc buf; - for (bool full(false);!full && retval && (r.from() < r.to());) { - Packet::Entry e; - int64_t fPos = file.GetPosition(); - retval = read(file, e, buf, false); - if (retval && - e.valid() && - (r.from() < e.serial()) && - (e.serial() <= r.to())) { - try { - full = addPacket(newPacket, e); - } catch (const std::exception & ex) { - throw runtime_error(make_string("%s : Failed creating packet for visit %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", - ex.what(), file.GetFileName(), file.GetSize(), fPos, file.GetPosition())); - } - if ( !full ) { - r.from(e.serial()); - } else { - if ( ! file.SetPosition(fPos) ) { - throw runtime_error(make_string("Failed setting read position for file '%s' of size %" PRId64 " from %" PRId64 " to %" PRId64 ".", - file.GetFileName(), file.GetSize(), file.GetPosition(), fPos)); - } - } - } - } - packet = newPacket; + + packet = readPacket(file, r, TARGET_PACKET_SIZE, false); + if (!packet.empty()) { + r.from(packet.range().to()); } - return retval; + return ! packet.empty(); } void -DomainPart::write(FastOS_FileInterface &file, const Packet::Entry &entry) +DomainPart::write(FastOS_FileInterface &file, const IChunk & chunk) { - int64_t lastKnownGoodPos(byteSize()); - int32_t crc(0); - uint32_t len(entry.serializedSize() + sizeof(crc)); nbostream os; - os << static_cast<uint8_t>(_encoding.getRaw()); - os << len; - size_t start(os.size()); - entry.serialize(os); - size_t end(os.size()); - crc = calcCrc(_encoding.getCrc(), os.data() + start, end - start); - os << crc; - size_t osSize = os.size(); - assert(osSize == len + sizeof(len) + sizeof(uint8_t)); - + size_t begin = os.wp(); + os << _encoding.getRaw(); // Placeholder for encoding + os << uint32_t(0); // Placeholder for size + Encoding realEncoding = chunk.encode(os); + size_t end = os.wp(); + os.wp(0); + os << realEncoding.getRaw(); //Patching real encoding + os << uint32_t(end - (begin + sizeof(uint32_t) + sizeof(uint8_t))); // Patching actual size. + os.wp(end); + int64_t lastKnownGoodPos(file.GetPosition()); LockGuard guard(_writeLock); - if ( ! file.CheckedWrite(os.data(), osSize) ) { - throw runtime_error(handleWriteError("Failed writing the entry.", file, lastKnownGoodPos, SerialNumRange(entry.serial(), entry.serial()), end - start)); + if ( ! file.CheckedWrite(os.data(), os.size()) ) { + throw runtime_error(handleWriteError("Failed writing the entry.", file, lastKnownGoodPos, chunk.range(), os.size())); } - _writtenSerial = entry.serial(); - _byteSize.store(lastKnownGoodPos + osSize, std::memory_order_release); + _writtenSerial = chunk.range().to(); + _byteSize.store(lastKnownGoodPos + os.size(), std::memory_order_release); } bool -DomainPart::read(FastOS_FileInterface &file, - Packet::Entry &entry, - Alloc & buf, - bool allowTruncate) +DomainPart::read(FastOS_FileInterface &file, IChunk::UP & chunk, Alloc & buf, bool allowTruncate) { - bool retval(true); char tmp[5]; int64_t lastKnownGoodPos(file.GetPosition()); size_t rlen = file.Read(tmp, sizeof(tmp)); nbostream his(tmp, sizeof(tmp)); - uint8_t version(-1); + uint8_t encoding(-1); uint32_t len(0); - his >> version >> len; - if ((retval = (rlen == sizeof(tmp)))) { - if ( ! (retval = (version == Encoding::Crc::ccitt_crc32) || version == Encoding::Crc::xxh64)) { - string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2'," - " got %d from '%s' at position %" PRId64, - version, file.GetFileName(), lastKnownGoodPos)); - if ((version == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) { - LOG(warning, "%s", msg.c_str()); - return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate); - } else { - throw runtime_error(msg); - } - } - if (len > buf.size()) { - Alloc::alloc(len).swap(buf); - } - rlen = file.Read(buf.get(), len); - retval = rlen == len; - if (!retval) { - retval = handleReadError("packet blob", file, len, rlen, lastKnownGoodPos, allowTruncate); - } else { - nbostream_longlivedbuf is(buf.get(), len); - entry.deserialize(is); - int32_t crc(0); - is >> crc; - int32_t crcVerify(calcCrc(Encoding(version).getCrc(), buf.get(), len - sizeof(crc))); - if (crc != crcVerify) { - throw runtime_error(make_string("Got bad crc for packet from '%s' (len pos=%" PRId64 ", len=%d) : crcVerify = %d, expected %d", - file.GetFileName(), file.GetPosition() - len - sizeof(len), - static_cast<int>(len), static_cast<int>(crcVerify), static_cast<int>(crc))); - } - } - } else { - if (rlen == 0) { - // Eof + his >> encoding >> len; + if (rlen != sizeof(tmp)) { + return (rlen == 0) + ? true + : handleReadError("packet length", file, sizeof(len), rlen, lastKnownGoodPos, allowTruncate); + } + + try { + chunk = IChunk::create(encoding); + } catch (const std::exception & e) { + string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2'," + " got %d from '%s' at position %" PRId64, + encoding, file.GetFileName(), lastKnownGoodPos)); + if ((encoding == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) { + LOG(warning, "%s", msg.c_str()); + return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate); } else { - retval = handleReadError("packet length", file, sizeof(len), rlen, lastKnownGoodPos, allowTruncate); + throw runtime_error(msg); } } - return retval; + if (len > buf.size()) { + Alloc::alloc(len).swap(buf); + } + rlen = file.Read(buf.get(), len); + if (rlen != len) { + return handleReadError("packet blob", file, len, rlen, lastKnownGoodPos, allowTruncate); + } + try { + nbostream_longlivedbuf is(buf.get(), len); + chunk->decode(is); + } catch (const std::exception & e) { + throw runtime_error(make_string("Got exception during decoding of packet '%s' from file '%s' (pos=%" PRId64 ", len=%d)", + e.what(), file.GetFileName(), file.GetPosition() - len - sizeof(len), static_cast<int>(len))); + } + return true; } } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h index f3a53c1e9a9..5256b731125 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h @@ -46,12 +46,13 @@ public: } bool isClosed() const; private: + using Alloc = vespalib::alloc::Alloc; bool openAndFind(FastOS_FileInterface &file, const SerialNum &from); int64_t buildPacketMapping(bool allowTruncate); + static Packet readPacket(FastOS_FileInterface & file, SerialNumRange wanted, size_t targetSize, bool allowTruncate); + static bool read(FastOS_FileInterface &file, IChunk::UP & chunk, Alloc &buf, bool allowTruncate); - static bool read(FastOS_FileInterface &file, Packet::Entry &entry, vespalib::alloc::Alloc &buf, bool allowTruncate); - - void write(FastOS_FileInterface &file, const Packet::Entry &entry); + void write(FastOS_FileInterface &file, const IChunk & entry); void writeHeader(const common::FileHeaderContext &fileHeaderContext); class SkipInfo |