summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-09-04 12:42:59 +0200
committerGitHub <noreply@github.com>2020-09-04 12:42:59 +0200
commitcdf17e68e3e9df196705ebe80b5002f6e23fca19 (patch)
tree19cabca511b8d2c60d0a7a325eacb6d500bffa1c
parent503b0ff5037f0db031b3410899e1f9cccb23bd0a (diff)
parentc9f9adce2b5352c2b5d4653d1e08bde9e7f707b7 (diff)
Merge pull request #14284 from vespa-engine/balder/mandatory-error-check-in-add
Balder/mandatory error check in add
-rw-r--r--searchlib/src/tests/transactionlog/translogclient_test.cpp2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/common.cpp30
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/common.h5
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp276
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.h7
5 files changed, 135 insertions, 185 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp
index d003bad0582..b7eb56d1fd9 100644
--- a/searchlib/src/tests/transactionlog/translogclient_test.cpp
+++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp
@@ -251,7 +251,7 @@ fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name)
Packet b(DEFAULT_PACKET_SIZE);
b.add(e2);
b.add(e3);
- EXPECT_FALSE(b.add(e1));
+ EXPECT_EXCEPTION(b.add(e1), std::runtime_error, "");
ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size())));
ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(b.getHandle().data(), b.getHandle().size())));
EXPECT_EXCEPTION(s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size())),
diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.cpp b/searchlib/src/vespa/searchlib/transactionlog/common.cpp
index 40a065277be..afaaa349b64 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/common.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/common.cpp
@@ -9,7 +9,7 @@ namespace search::transactionlog {
using vespalib::nbostream;
using vespalib::nbostream_longlivedbuf;
-using vespalib::make_string;
+using vespalib::make_string_short::fmt;
using std::runtime_error;
namespace {
@@ -18,7 +18,7 @@ void throwRangeError(SerialNum prev, SerialNum next) __attribute__((noinline));
void
throwRangeError(SerialNum prev, SerialNum next) {
- throw runtime_error(make_string("The new serialnum %" PRIu64 " is not higher than the old one %" PRIu64 "", next, prev));
+ throw runtime_error(fmt("The new serialnum %" PRIu64 " is not higher than the old one %" PRIu64 "", next, prev));
}
}
@@ -51,7 +51,6 @@ SerialNumRange::cmp(const SerialNumRange & b) const
Packet::Packet(const void * buf, size_t sz) :
_count(0),
_range(),
- _limit(sz),
_buf(static_cast<const char *>(buf), sz)
{
nbostream_longlivedbuf os(_buf.data(), sz);
@@ -105,22 +104,21 @@ Packet::Entry::Entry(SerialNum u, Type t, const vespalib::ConstBufferRef & d) :
_type(t),
_valid(true),
_data(d)
-{
-}
-
+{ }
-bool Packet::add(const Packet::Entry & e)
+void
+Packet::add(const Packet::Entry & e)
{
- bool retval((_buf.size() < _limit) && (_range.to() < e.serial()));
- if (retval) {
- if (_buf.empty()) {
- _range.from(e.serial());
- }
- e.serialize(_buf);
- _count++;
- _range.to(e.serial());
+ if (_range.to() >= e.serial()) {
+ throwRangeError(_range.to(), e.serial());
}
- return retval;
+
+ if (_buf.empty()) {
+ _range.from(e.serial());
+ }
+ e.serialize(_buf);
+ _count++;
+ _range.to(e.serial());
}
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h
index 045bc251e24..0deceb2668a 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/common.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/common.h
@@ -69,9 +69,9 @@ public:
vespalib::ConstBufferRef _data;
};
public:
- Packet(size_t m=0xf000) : _count(0), _range(), _limit(m), _buf(m) { }
+ Packet(size_t reserved) : _count(0), _range(), _buf(reserved) { }
Packet(const void * buf, size_t sz);
- bool add(const Entry & data);
+ void add(const Entry & data);
void clear() { _buf.clear(); _count = 0; _range.from(0); _range.to(0); }
const SerialNumRange & range() const { return _range; }
const vespalib::nbostream & getHandle() const { return _buf; }
@@ -82,7 +82,6 @@ public:
private:
size_t _count;
SerialNumRange _range;
- size_t _limit;
vespalib::nbostream_longlivedbuf _buf;
};
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
index db26c0c1ee8..17a12ac717a 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
@@ -1,12 +1,10 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "domainpart.h"
-#include <vespa/vespalib/util/crc.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/data/fileheader.h>
#include <vespa/searchlib/common/fileheadercontext.h>
#include <vespa/fastlib/io/bufferedfile.h>
-#include <xxhash.h>
#include <vespa/log/log.h>
LOG_SETUP(".transactionlog.domainpart");
@@ -38,15 +36,15 @@ handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize
int64_t lastKnownGoodPos, bool allowTruncate) __attribute__ ((noinline));
void handleSync(FastOS_FileInterface &file) __attribute__ ((noinline));
-bool addPacket(Packet &packet, const Packet::Entry &e) __attribute__ ((noinline));
+void addPacket(Packet &packet, const Packet::Entry &e) __attribute__ ((noinline));
bool tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) __attribute__ ((noinline));
-bool
+void
addPacket(Packet &packet, const Packet::Entry &e)
{
LOG(spam, "Adding serial #%" PRIu64 ", of type %d and size %zd into packet of size %zu and %zu bytes",
e.serial(), e.type(), e.data().size(), packet.size(), packet.sizeBytes());
- return ! packet.add(e);
+ packet.add(e);
}
void
@@ -158,20 +156,43 @@ handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize
return retval;
}
-int32_t
-calcCrc(Encoding::Crc version, const void * buf, size_t sz)
-{
- if (version == Encoding::Crc::xxh64) {
- return static_cast<int32_t>(XXH64(buf, sz, 0ll));
- } else if (version == Encoding::Crc::ccitt_crc32) {
- vespalib::crc_32_type calculator;
- calculator.process_bytes(buf, sz);
- return calculator.checksum();
- } else {
- LOG_ABORT("should not be reached");
- }
}
+Packet
+DomainPart::readPacket(FastOS_FileInterface & transLog, SerialNumRange wanted, size_t targetSize, bool allowTruncate) {
+ Alloc buf;
+ Packet packet(targetSize);
+ int64_t fSize(transLog.GetSize());
+ int64_t currPos(transLog.GetPosition());
+ for(size_t i(0); (packet.sizeBytes() < targetSize) && (currPos < fSize) && (packet.range().to() < wanted.to()); i++) {
+ IChunk::UP chunk;
+ if (read(transLog, chunk, buf, allowTruncate)) {
+ if (chunk) {
+ try {
+ for (const Packet::Entry & e : chunk->getEntries()) {
+ if ((wanted.from() < e.serial()) && (e.serial() <= wanted.to())) {
+ addPacket(packet, e);
+ }
+ }
+ } catch (const std::exception & ex) {
+ throw runtime_error(make_string("%s : Failed creating packet for list %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
+ ex.what(), transLog.GetFileName(), fSize, currPos, transLog.GetPosition()));
+ }
+ } else {
+ throw runtime_error(make_string("Invalid entry reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
+ transLog.GetFileName(), fSize, currPos, transLog.GetPosition()));
+ }
+ } else {
+ if (transLog.GetSize() != fSize) {
+ fSize = transLog.GetSize();
+ } else {
+ throw runtime_error(make_string("Failed reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
+ transLog.GetFileName(), fSize, currPos, transLog.GetPosition()));
+ }
+ }
+ currPos = transLog.GetPosition();
+ }
+ return packet;
}
int64_t
@@ -204,57 +225,26 @@ DomainPart::buildPacketMapping(bool allowTruncate)
handleReadError("file header", transLog, 0, FileHeader::getMinSize(), 0, allowTruncate);
}
}
+ const SerialNumRange all(0, std::numeric_limits<SerialNum>::max());
while ((currPos < fSize)) {
- Packet packet;
- SerialNum firstSerial(0);
- SerialNum lastSerial(0);
- int64_t firstPos(currPos);
- bool full(false);
- Alloc buf;
- for(size_t i(0); !full && (currPos < fSize); i++) {
- Packet::Entry e;
- if (read(transLog, e, buf, allowTruncate)) {
- if (e.valid()) {
- if (i == 0) {
- firstSerial = e.serial();
- if (currPos == _headerLen) {
- _range.from(firstSerial);
- }
- }
- try {
- full = addPacket(packet, e);
- if ( ! full ) {
- lastSerial = e.serial();
- currPos = transLog.GetPosition();
- _sz++;
- } else {
- transLog.SetPosition(currPos);
- }
- } catch (const std::exception & ex) {
- throw runtime_error(make_string("%s : Failed creating packet for list %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
- ex.what(), transLog.GetFileName(), fSize, currPos, transLog.GetPosition()));
- }
- } else {
- throw runtime_error(make_string("Invalid entry reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
- transLog.GetFileName(), fSize, currPos, transLog.GetPosition()));
- }
- } else {
- if (transLog.GetSize() != fSize) {
- fSize = transLog.GetSize();
- } else {
- throw runtime_error(make_string("Failed reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
- transLog.GetFileName(), fSize, currPos, transLog.GetPosition()));
- }
- }
- }
+ const int64_t firstPos(currPos);
+ Packet packet = readPacket(transLog, all, TARGET_PACKET_SIZE, allowTruncate);
if (!packet.empty()) {
- _packets[firstSerial] = packet;
- _range.to(lastSerial);
+ _sz += packet.size();
+ const SerialNum firstSerial = packet.range().from();
+ if (currPos == _headerLen) {
+ _range.from(firstSerial);
+ }
+ _range.to(packet.range().to());
+ _packets.insert(std::make_pair(firstSerial, std::move(packet)));
{
LockGuard guard(_lock);
_skipList.push_back(SkipInfo(firstSerial, firstPos));
}
+ } else {
+ fSize = transLog.GetSize();
}
+ currPos = transLog.GetPosition();
}
transLog.Close();
return currPos;
@@ -409,10 +399,12 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet)
//LOG(spam,
//"Pos(%d) Len(%d), Lim(%d), Remaining(%d)",
//h.getPos(), h.getLength(), h.getLimit(), h.getRemaining());
+ IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel);
Packet::Entry entry;
entry.deserialize(h);
if (_range.to() < entry.serial()) {
- write(*_transLog, entry);
+ chunk->add(entry);
+ write(*_transLog, *chunk);
_sz++;
_range.to(entry.serial());
} else {
@@ -431,7 +423,7 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet)
}
}
if (! merged ) {
- _packets[firstSerial] = packet;
+ _packets.insert(std::make_pair(firstSerial, std::move(packet)));
_skipList.push_back(SkipInfo(firstSerial, firstPos));
}
}
@@ -501,18 +493,15 @@ DomainPart::visit(SerialNumRange &r, Packet &packet)
if (e.serial() <= r.to()) {
LOG(spam, "Adding serial #%" PRIu64 ", of type %d and size %zd into packet of size %zu and %zu bytes",
e.serial(), e.type(), e.data().size(), newPacket.size(), newPacket.sizeBytes());
- if (newPacket.add(e)) {
- r.from(e.serial());
- } else {
- throw runtime_error("Could not add entry to packet. Here is some mumbo jumbo. Fix.");
- }
+ newPacket.add(e);
+ r.from(e.serial());
} else {
// Force breakout on visiting empty interval.
r.from(r.to());
}
}
}
- packet = newPacket;
+ packet = std::move(newPacket);
retval = next != _packets.end();
}
}
@@ -527,121 +516,84 @@ DomainPart::visit(SerialNumRange &r, Packet &packet)
bool
DomainPart::visit(FastOS_FileInterface &file, SerialNumRange &r, Packet &packet)
{
- bool retval(true);
- if ( ! file.IsOpened() ) {
- retval = openAndFind(file, r.from() + 1);
+ if ( ! file.IsOpened() && ! openAndFind(file, r.from() + 1)) {
+ return false;
}
- if (retval) {
- Packet newPacket;
- Alloc buf;
- for (bool full(false);!full && retval && (r.from() < r.to());) {
- Packet::Entry e;
- int64_t fPos = file.GetPosition();
- retval = read(file, e, buf, false);
- if (retval &&
- e.valid() &&
- (r.from() < e.serial()) &&
- (e.serial() <= r.to())) {
- try {
- full = addPacket(newPacket, e);
- } catch (const std::exception & ex) {
- throw runtime_error(make_string("%s : Failed creating packet for visit %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
- ex.what(), file.GetFileName(), file.GetSize(), fPos, file.GetPosition()));
- }
- if ( !full ) {
- r.from(e.serial());
- } else {
- if ( ! file.SetPosition(fPos) ) {
- throw runtime_error(make_string("Failed setting read position for file '%s' of size %" PRId64 " from %" PRId64 " to %" PRId64 ".",
- file.GetFileName(), file.GetSize(), file.GetPosition(), fPos));
- }
- }
- }
- }
- packet = newPacket;
+
+ packet = readPacket(file, r, TARGET_PACKET_SIZE, false);
+ if (!packet.empty()) {
+ r.from(packet.range().to());
}
- return retval;
+ return ! packet.empty();
}
void
-DomainPart::write(FastOS_FileInterface &file, const Packet::Entry &entry)
+DomainPart::write(FastOS_FileInterface &file, const IChunk & chunk)
{
- int64_t lastKnownGoodPos(byteSize());
- int32_t crc(0);
- uint32_t len(entry.serializedSize() + sizeof(crc));
nbostream os;
- os << static_cast<uint8_t>(_encoding.getRaw());
- os << len;
- size_t start(os.size());
- entry.serialize(os);
- size_t end(os.size());
- crc = calcCrc(_encoding.getCrc(), os.data() + start, end - start);
- os << crc;
- size_t osSize = os.size();
- assert(osSize == len + sizeof(len) + sizeof(uint8_t));
-
+ size_t begin = os.wp();
+ os << _encoding.getRaw(); // Placeholder for encoding
+ os << uint32_t(0); // Placeholder for size
+ Encoding realEncoding = chunk.encode(os);
+ size_t end = os.wp();
+ os.wp(0);
+ os << realEncoding.getRaw(); //Patching real encoding
+ os << uint32_t(end - (begin + sizeof(uint32_t) + sizeof(uint8_t))); // Patching actual size.
+ os.wp(end);
+ int64_t lastKnownGoodPos(file.GetPosition());
LockGuard guard(_writeLock);
- if ( ! file.CheckedWrite(os.data(), osSize) ) {
- throw runtime_error(handleWriteError("Failed writing the entry.", file, lastKnownGoodPos, SerialNumRange(entry.serial(), entry.serial()), end - start));
+ if ( ! file.CheckedWrite(os.data(), os.size()) ) {
+ throw runtime_error(handleWriteError("Failed writing the entry.", file, lastKnownGoodPos, chunk.range(), os.size()));
}
- _writtenSerial = entry.serial();
- _byteSize.store(lastKnownGoodPos + osSize, std::memory_order_release);
+ _writtenSerial = chunk.range().to();
+ _byteSize.store(lastKnownGoodPos + os.size(), std::memory_order_release);
}
bool
-DomainPart::read(FastOS_FileInterface &file,
- Packet::Entry &entry,
- Alloc & buf,
- bool allowTruncate)
+DomainPart::read(FastOS_FileInterface &file, IChunk::UP & chunk, Alloc & buf, bool allowTruncate)
{
- bool retval(true);
char tmp[5];
int64_t lastKnownGoodPos(file.GetPosition());
size_t rlen = file.Read(tmp, sizeof(tmp));
nbostream his(tmp, sizeof(tmp));
- uint8_t version(-1);
+ uint8_t encoding(-1);
uint32_t len(0);
- his >> version >> len;
- if ((retval = (rlen == sizeof(tmp)))) {
- if ( ! (retval = (version == Encoding::Crc::ccitt_crc32) || version == Encoding::Crc::xxh64)) {
- string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2',"
- " got %d from '%s' at position %" PRId64,
- version, file.GetFileName(), lastKnownGoodPos));
- if ((version == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) {
- LOG(warning, "%s", msg.c_str());
- return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate);
- } else {
- throw runtime_error(msg);
- }
- }
- if (len > buf.size()) {
- Alloc::alloc(len).swap(buf);
- }
- rlen = file.Read(buf.get(), len);
- retval = rlen == len;
- if (!retval) {
- retval = handleReadError("packet blob", file, len, rlen, lastKnownGoodPos, allowTruncate);
- } else {
- nbostream_longlivedbuf is(buf.get(), len);
- entry.deserialize(is);
- int32_t crc(0);
- is >> crc;
- int32_t crcVerify(calcCrc(Encoding(version).getCrc(), buf.get(), len - sizeof(crc)));
- if (crc != crcVerify) {
- throw runtime_error(make_string("Got bad crc for packet from '%s' (len pos=%" PRId64 ", len=%d) : crcVerify = %d, expected %d",
- file.GetFileName(), file.GetPosition() - len - sizeof(len),
- static_cast<int>(len), static_cast<int>(crcVerify), static_cast<int>(crc)));
- }
- }
- } else {
- if (rlen == 0) {
- // Eof
+ his >> encoding >> len;
+ if (rlen != sizeof(tmp)) {
+ return (rlen == 0)
+ ? true
+ : handleReadError("packet length", file, sizeof(len), rlen, lastKnownGoodPos, allowTruncate);
+ }
+
+ try {
+ chunk = IChunk::create(encoding);
+ } catch (const std::exception & e) {
+ string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2',"
+ " got %d from '%s' at position %" PRId64,
+ encoding, file.GetFileName(), lastKnownGoodPos));
+ if ((encoding == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) {
+ LOG(warning, "%s", msg.c_str());
+ return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate);
} else {
- retval = handleReadError("packet length", file, sizeof(len), rlen, lastKnownGoodPos, allowTruncate);
+ throw runtime_error(msg);
}
}
- return retval;
+ if (len > buf.size()) {
+ Alloc::alloc(len).swap(buf);
+ }
+ rlen = file.Read(buf.get(), len);
+ if (rlen != len) {
+ return handleReadError("packet blob", file, len, rlen, lastKnownGoodPos, allowTruncate);
+ }
+ try {
+ nbostream_longlivedbuf is(buf.get(), len);
+ chunk->decode(is);
+ } catch (const std::exception & e) {
+ throw runtime_error(make_string("Got exception during decoding of packet '%s' from file '%s' (pos=%" PRId64 ", len=%d)",
+ e.what(), file.GetFileName(), file.GetPosition() - len - sizeof(len), static_cast<int>(len)));
+ }
+ return true;
}
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
index f3a53c1e9a9..5256b731125 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
@@ -46,12 +46,13 @@ public:
}
bool isClosed() const;
private:
+ using Alloc = vespalib::alloc::Alloc;
bool openAndFind(FastOS_FileInterface &file, const SerialNum &from);
int64_t buildPacketMapping(bool allowTruncate);
+ static Packet readPacket(FastOS_FileInterface & file, SerialNumRange wanted, size_t targetSize, bool allowTruncate);
+ static bool read(FastOS_FileInterface &file, IChunk::UP & chunk, Alloc &buf, bool allowTruncate);
- static bool read(FastOS_FileInterface &file, Packet::Entry &entry, vespalib::alloc::Alloc &buf, bool allowTruncate);
-
- void write(FastOS_FileInterface &file, const Packet::Entry &entry);
+ void write(FastOS_FileInterface &file, const IChunk & entry);
void writeHeader(const common::FileHeaderContext &fileHeaderContext);
class SkipInfo