From a2433d41489b18bd2f1e785cd9dd15a30e609857 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Fri, 20 Oct 2017 18:28:29 +0200 Subject: Refactor Packet and read of TLS. --- .../tests/transactionlog/translogclient_test.cpp | 52 ++--- .../tests/transactionlogstress/translogstress.cpp | 7 +- .../vespa/searchlib/transactionlog/CMakeLists.txt | 2 + .../src/vespa/searchlib/transactionlog/chunks.cpp | 41 ++++ .../src/vespa/searchlib/transactionlog/chunks.h | 23 ++ .../src/vespa/searchlib/transactionlog/common.cpp | 69 +++--- .../src/vespa/searchlib/transactionlog/common.h | 8 +- .../src/vespa/searchlib/transactionlog/domain.cpp | 13 +- .../src/vespa/searchlib/transactionlog/domain.h | 3 +- .../vespa/searchlib/transactionlog/domainpart.cpp | 256 +++++++++------------ .../vespa/searchlib/transactionlog/domainpart.h | 36 ++- .../src/vespa/searchlib/transactionlog/ichunk.h | 51 ++++ .../src/vespa/searchlib/transactionlog/session.cpp | 2 +- .../searchlib/transactionlog/translogserverapp.cpp | 4 +- 14 files changed, 322 insertions(+), 245 deletions(-) create mode 100644 searchlib/src/vespa/searchlib/transactionlog/chunks.cpp create mode 100644 searchlib/src/vespa/searchlib/transactionlog/chunks.h create mode 100644 searchlib/src/vespa/searchlib/transactionlog/ichunk.h (limited to 'searchlib/src') diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index 861023b79b7..b61245c8137 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #include LOG_SETUP("translogclient_test"); @@ -221,6 +220,7 @@ public: IMPLEMENT_IDENTIFIABLE(TestIdentifiable, Identifiable); +constexpr size_t DEFAULT_PACKET_SIZE = 0xf000; bool Test::partialUpdateTest() { bool retval(false); @@ -239,9 +239,8 @@ bool Test::partialUpdateTest() vespalib::ConstBufferRef bb(os.c_str(), os.size()); LOG(info, "DU : %s", myhex(bb.c_str(), bb.size()).c_str()); Packet::Entry e(7, du.getClass().id(), bb); - Packet pa; + Packet pa(DEFAULT_PACKET_SIZE); pa.add(e); - pa.close(); ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().c_str(), pa.getHandle().size()))); CallBackUpdate ca; @@ -312,14 +311,12 @@ bool Test::fillDomainTest(TransLogClient::Session * s1, const vespalib::string & Packet::Entry e2(2, 2, vespalib::ConstBufferRef("Content in buffer B", 20)); Packet::Entry e3(3, 1, vespalib::ConstBufferRef("Content in buffer C", 20)); - Packet a; - ASSERT_TRUE (a.add(e1)); - Packet b; - ASSERT_TRUE (b.add(e2)); - ASSERT_TRUE (b.add(e3)); - ASSERT_TRUE (!b.add(e1)); - a.close(); - b.close(); + Packet a(DEFAULT_PACKET_SIZE); + a.add(e1); + Packet b(DEFAULT_PACKET_SIZE); + b.add(e2); + b.add(e3); + b.add(e1); ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(a.getHandle().c_str(), a.getHandle().size()))); ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(b.getHandle().c_str(), b.getHandle().size()))); try { @@ -334,7 +331,7 @@ bool Test::fillDomainTest(TransLogClient::Session * s1, const vespalib::string & EXPECT_EQUAL(b.size(), 2u); EXPECT_EQUAL(b.range().from(), 2u); EXPECT_EQUAL(b.range().to(), 3u); - EXPECT_TRUE(a.merge(b)); + a.merge(b); EXPECT_EQUAL(a.size(), 3u); EXPECT_EQUAL(a.range().from(), 1u); EXPECT_EQUAL(a.range().to(), 3u); @@ -353,41 +350,35 @@ void Test::fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_ { size_t value(0); for(size_t i=0; i < numPackets; i++) { - std::unique_ptr p(new Packet()); + std::unique_ptr p(new Packet(DEFAULT_PACKET_SIZE)); for(size_t j=0; j < numEntries; j++, value++) { Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value))); - if ( ! p->add(e) ) { - p->close(); + p->add(e); + if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) { ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size()))); - p.reset(new Packet()); - ASSERT_TRUE(p->add(e)); + p.reset(new Packet(DEFAULT_PACKET_SIZE)); } } - p->close(); ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size()))); } } void -Test::fillDomainTest(TransLogClient::Session * s1, - size_t numPackets, size_t numEntries, - size_t entrySize) +Test::fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize) { size_t value(0); std::vector entryBuffer(entrySize); for(size_t i=0; i < numPackets; i++) { - std::unique_ptr p(new Packet()); + std::unique_ptr p(new Packet(DEFAULT_PACKET_SIZE)); for(size_t j=0; j < numEntries; j++, value++) { Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&entryBuffer[0], entryBuffer.size())); - if ( ! p->add(e) ) { - p->close(); + p->add(e); + if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) { ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size()))); - p.reset(new Packet()); - ASSERT_TRUE(p->add(e)); + p.reset(new Packet(DEFAULT_PACKET_SIZE)); } } - p->close(); ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size()))); } } @@ -410,8 +401,7 @@ Test::countFiles(const vespalib::string &dir) void -Test::checkFilledDomainTest(const TransLogClient::Session::UP &s1, - size_t numEntries) +Test::checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries) { SerialNum b(0), e(0); size_t c(0); @@ -526,8 +516,8 @@ void Test::verifyDomain(const vespalib::string & name) void Test::testCrcVersions() { - createAndFillDomain("ccitt_crc32", DomainPart::ccitt_crc32, 0); - createAndFillDomain("xxh64", DomainPart::xxh64, 1); + createAndFillDomain("ccitt_crc32", Encoding::Crc::ccitt_crc32, 0); + createAndFillDomain("xxh64", Encoding::Crc::xxh64, 1); verifyDomain("ccitt_crc32"); verifyDomain("xxh64"); diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp index abba84b75b6..62ef49ff689 100644 --- a/searchlib/src/tests/transactionlogstress/translogstress.cpp +++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include #include @@ -221,7 +220,6 @@ FeederThread::~FeederThread() {} void FeederThread::commitPacket() { - _packet.close(); const vespalib::nbostream& stream = _packet.getHandle(); if (!_session->commit(ConstBufferRef(stream.c_str(), stream.size()))) { throw std::runtime_error(vespalib::make_string @@ -236,8 +234,9 @@ FeederThread::commitPacket() bool FeederThread::addEntry(const Packet::Entry & e) { - //LOG(info, "FeederThread: add %s", EntryPrinter::toStr(e).c_str()); - return _packet.add(e); + if (_packet.sizeBytes() > 0xf000) return false; + _packet.add(e); + return true; } void diff --git a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt index d964c88fe29..0755d07b403 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt @@ -1,9 +1,11 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(searchlib_transactionlog OBJECT SOURCES + chunks.cpp common.cpp domain.cpp domainpart.cpp + ichunk.cpp nosyncproxy.cpp session.cpp trans_log_server_explorer.cpp diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp new file mode 100644 index 00000000000..86369cf6923 --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp @@ -0,0 +1,41 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "chunks.h" +#include + +using std::runtime_error; +using std::make_unique; +using vespalib::make_string; + +namespace search::transactionlog { + +void XXH64None::onEncode(nbostream &os) { + (void) os; +} + +void XXH64None::onDecode(nbostream &is) { + if (is.size() < sizeof(int32_t)) { + throw runtime_error(make_string("Not even room for the crc. Only %zu bytes left", is.size())); + } + size_t start = is.rp(); + is.adjustReadPos(is.size() - sizeof(int32_t)); + int32_t crc(0); + is >> crc; + is.rp(start); + int32_t crcVerify = Encoding::calcCrc(Encoding::Crc::xxh64, is.c_str(), is.size() - sizeof(crc)); + if (crc != crcVerify) { + throw runtime_error(make_string("Got bad crc : crcVerify = %d, expected %d", + static_cast(crcVerify), static_cast(crc))); + } +} + +void XXH64LZ4::onEncode(IChunk::nbostream &os) { + (void) os; + +} + +void XXH64LZ4::onDecode(IChunk::nbostream &is) { + (void) is; +} + +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.h b/searchlib/src/vespa/searchlib/transactionlog/chunks.h new file mode 100644 index 00000000000..1e1d8ff23eb --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/chunks.h @@ -0,0 +1,23 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "ichunk.h" + +namespace search::transactionlog { + +class XXH64None : public IChunk { +protected: + void onEncode(nbostream &os) override; + void onDecode(nbostream &is) override; +public: +}; + +class XXH64LZ4 : public IChunk { +protected: + void onEncode(nbostream &os) override; + void onDecode(nbostream &is) override; +public: +}; + +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.cpp b/searchlib/src/vespa/searchlib/transactionlog/common.cpp index 7d919f9ad2e..9bf43c8e244 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/common.cpp @@ -1,14 +1,29 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "common.h" +#include #include namespace search::transactionlog { using vespalib::nbostream; using vespalib::nbostream_longlivedbuf; +using vespalib::make_string; +using std::runtime_error; -int makeDirectory(const char * dir) +namespace { + +void throwRangeError(SerialNum prev, SerialNum next) __attribute__((noinline)); + +void throwRangeError(SerialNum prev, SerialNum next) { + if (prev < next) return; + throw runtime_error(make_string("The new serialnum %zu is not higher than the old one %zu", next, prev)); +} + +} + +int +makeDirectory(const char * dir) { int retval(-1); @@ -22,7 +37,8 @@ int makeDirectory(const char * dir) return retval; } -int64_t SerialNumRange::cmp(const SerialNumRange & b) const +int64_t +SerialNumRange::cmp(const SerialNumRange & b) const { int64_t diff(0); if ( ! (contains(b) || b.contains(*this)) ) { @@ -34,7 +50,6 @@ int64_t SerialNumRange::cmp(const SerialNumRange & b) const Packet::Packet(const void * buf, size_t sz) : _count(0), _range(), - _limit(sz), _buf(static_cast(buf), sz) { nbostream_longlivedbuf os(_buf.c_str(), sz); @@ -49,21 +64,22 @@ Packet::Packet(const void * buf, size_t sz) : } } -bool Packet::merge(const Packet & packet) +void +Packet::merge(const Packet & packet) { - bool retval(_range.to() < packet.range().from()); - if (retval) { - if (_buf.empty()) { - _range.from(packet.range().from()); - } - _count += packet._count; - _range.to(packet._range.to()); - _buf.write(packet.getHandle().c_str(), packet.getHandle().size()); + if (_range.to() >= packet.range().from()) { + throwRangeError(_range.to(), packet.range().from()); } - return retval; + if (_buf.empty()) { + _range.from(packet.range().from()); + } + _count += packet._count; + _range.to(packet._range.to()); + _buf.write(packet.getHandle().c_str(), packet.getHandle().size()); } -nbostream & Packet::Entry::deserialize(nbostream & os) +nbostream & +Packet::Entry::deserialize(nbostream & os) { _valid = false; int32_t len(0); @@ -74,7 +90,8 @@ nbostream & Packet::Entry::deserialize(nbostream & os) return os; } -nbostream & Packet::Entry::serialize(nbostream & os) const +nbostream & +Packet::Entry::serialize(nbostream & os) const { os << _unique << _type << static_cast(_data.size()); os.write(_data.c_str(), _data.size()); @@ -88,19 +105,19 @@ Packet::Entry::Entry(SerialNum u, Type t, const vespalib::ConstBufferRef & d) : _data(d) { } - -bool Packet::add(const Packet::Entry & e) +void +Packet::add(const Packet::Entry & e) { - bool retval((_buf.size() < _limit) && (_range.to() < e.serial())); - if (retval) { - if (_buf.empty()) { - _range.from(e.serial()); - } - e.serialize(_buf); - _count++; - _range.to(e.serial()); + if (_range.to() >= e.serial()) { + throwRangeError(_range.to(), e.serial()); } - return retval; + + if (_buf.empty()) { + _range.from(e.serial()); + } + e.serialize(_buf); + _count++; + _range.to(e.serial()); } } diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h index db8b9727daa..0deceb2668a 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.h +++ b/searchlib/src/vespa/searchlib/transactionlog/common.h @@ -69,21 +69,19 @@ public: vespalib::ConstBufferRef _data; }; public: - Packet(size_t m=0xf000) : _count(0), _range(), _limit(m), _buf(m) { } + Packet(size_t reserved) : _count(0), _range(), _buf(reserved) { } Packet(const void * buf, size_t sz); - bool add(const Entry & data); - void close() { } + void add(const Entry & data); void clear() { _buf.clear(); _count = 0; _range.from(0); _range.to(0); } const SerialNumRange & range() const { return _range; } const vespalib::nbostream & getHandle() const { return _buf; } size_t size() const { return _count; } bool empty() const { return _count == 0; } size_t sizeBytes() const { return _buf.size(); } - bool merge(const Packet & packet); + void merge(const Packet & packet); private: size_t _count; SerialNumRange _range; - size_t _limit; vespalib::nbostream_longlivedbuf _buf; }; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 3248a0aba91..101efa8cb74 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -307,15 +307,20 @@ waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync) } + +Domain::Chunk::Chunk() + : _data(size_t(-1)), + _callBacks(), + _firstArrivalTime() +{} + +Domain::Chunk::~Chunk() = default; void Domain::Chunk::add(const Packet &packet, Writer::DoneCallback onDone) { if (_callBacks.empty()) { _firstArrivalTime = steady_clock::now(); } - if ( ! _data.merge(packet) ) { - throw runtime_error(make_string("Failed merging of packet %zu into packet %zu", - packet.range().from(), _data.range().from())); - } + _data.merge(packet); _callBacks.emplace_back(std::move(onDone)); } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index 7eb8e201d67..fb18bb955d7 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -83,7 +83,8 @@ private: void commitIfStale(const vespalib::MonitorGuard & guard); class Chunk { public: - + Chunk(); + ~Chunk(); void add(const Packet & packet, Writer::DoneCallback onDone); size_t sizeBytes() const { return _data.sizeBytes(); } const Packet & getPacket() const { return _data; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp index 44f7b55882e..696b3acb095 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp @@ -2,7 +2,6 @@ #include "domainpart.h" #include -#include #include #include #include @@ -27,8 +26,7 @@ namespace search::transactionlog { namespace { -void -handleSync(FastOS_FileInterface &file) __attribute__ ((noinline)); +constexpr size_t TARGET_PACKET_SIZE = 0x3f000; string handleWriteError(const char *text, FastOS_FileInterface &file, int64_t lastKnownGoodPos, @@ -38,18 +36,16 @@ bool handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize_t rlen, int64_t lastKnownGoodPos, bool allowTruncate) __attribute__ ((noinline)); -bool -addPacket(Packet &packet, const Packet::Entry &e) __attribute__ ((noinline)); +void handleSync(FastOS_FileInterface &file) __attribute__ ((noinline)); +void addPacket(Packet &packet, const Packet::Entry &e) __attribute__ ((noinline)); +bool tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) __attribute__ ((noinline)); -bool -tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) __attribute__ ((noinline)); - -bool +void addPacket(Packet &packet, const Packet::Entry &e) { LOG(spam, "Adding serial #%" PRIu64 ", of type %d and size %zd into packet of size %zu and %zu bytes", e.serial(), e.type(), e.data().size(), packet.size(), packet.sizeBytes()); - return ! packet.add(e); + packet.add(e); } void @@ -163,6 +159,43 @@ handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize } +Packet +DomainPart::readPacket(FastOS_FileInterface & transLog, SerialNumRange wanted, size_t targetSize, bool allowTruncate) { + Alloc buf; + Packet packet(targetSize); + int64_t fSize(transLog.GetSize()); + int64_t currPos(transLog.GetPosition()); + for(size_t i(0); (packet.sizeBytes() < targetSize) && (currPos < fSize) && (packet.range().to() < wanted.to()); i++) { + IChunk::UP chunk; + if (read(transLog, chunk, buf, allowTruncate)) { + if (chunk) { + try { + for (const Packet::Entry & e : chunk->getEntries()) { + if ((wanted.from() < e.serial()) && (e.serial() <= wanted.to())) { + addPacket(packet, e); + } + } + } catch (const std::exception & ex) { + throw runtime_error(make_string("%s : Failed creating packet for list %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", + ex.what(), transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); + } + } else { + throw runtime_error(make_string("Invalid entry reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", + transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); + } + } else { + if (transLog.GetSize() != fSize) { + fSize = transLog.GetSize(); + } else { + throw runtime_error(make_string("Failed reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", + transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); + } + } + currPos = transLog.GetPosition(); + } + return packet; +} + int64_t DomainPart::buildPacketMapping(bool allowTruncate) { @@ -171,7 +204,7 @@ DomainPart::buildPacketMapping(bool allowTruncate) if ( ! transLog.OpenReadOnly(_transLog->GetFileName())) { throw runtime_error(make_string("Failed opening '%s' for buffered readinf with direct io.", transLog.GetFileName())); } - int64_t fSize(transLog.GetSize()); + const int64_t fSize(transLog.GetSize()); int64_t currPos(0); try { FileHeader header; @@ -193,58 +226,24 @@ DomainPart::buildPacketMapping(bool allowTruncate) handleReadError("file header", transLog, 0, FileHeader::getMinSize(), 0, allowTruncate); } } + const SerialNumRange all(0, std::numeric_limits::max()); while ((currPos < fSize)) { - Packet packet; - SerialNum firstSerial(0); - SerialNum lastSerial(0); - int64_t firstPos(currPos); - bool full(false); - Alloc buf; - for(size_t i(0); !full && (currPos < fSize); i++) { - Packet::Entry e; - if (read(transLog, e, buf, allowTruncate)) { - if (e.valid()) { - if (i == 0) { - firstSerial = e.serial(); - if (currPos == _headerLen) { - _range.from(firstSerial); - } - } - try { - full = addPacket(packet, e); - if ( ! full ) { - lastSerial = e.serial(); - currPos = transLog.GetPosition(); - _sz++; - } else { - transLog.SetPosition(currPos); - } - } catch (const std::exception & ex) { - throw runtime_error(make_string("%s : Failed creating packet for list %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", - ex.what(), transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); - } - } else { - throw runtime_error(make_string("Invalid entry reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", - transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); - } - } else { - if (transLog.GetSize() != fSize) { - fSize = transLog.GetSize(); - } else { - throw runtime_error(make_string("Failed reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", - transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); - } - } - } - packet.close(); + const int64_t firstPos(currPos); + Packet packet = readPacket(transLog, all, TARGET_PACKET_SIZE, allowTruncate); if (!packet.empty()) { - _packets[firstSerial] = packet; - _range.to(lastSerial); + _sz += packet.size(); + const SerialNum firstSerial = packet.range().from(); + if (currPos == _headerLen) { + _range.from(firstSerial); + } + _range.to(packet.range().to()); + _packets.insert(std::make_pair(firstSerial, std::move(packet))); { LockGuard guard(_lock); _skipList.push_back(SkipInfo(firstSerial, firstPos)); } } + currPos = _transLog->GetPosition(); } transLog.Close(); return currPos; @@ -413,15 +412,12 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) if ( ! _packets.empty() ) { Packet & lastPacket = _packets.rbegin()->second; if (lastPacket.sizeBytes() < 0xf000) { - if ( ! (merged = lastPacket.merge(packet)) ) { - LOG(error, "Failed merging packet [%" PRIu64 ", %" PRIu64 "] with [%" PRIu64 ", %" PRIu64 "]", - lastPacket.range().from(), lastPacket.range().to(), - packet.range().from(), packet.range().to()); - } + lastPacket.merge(packet); + merged = true; } } if (! merged ) { - _packets[firstSerial] = packet; + _packets.insert(std::make_pair(firstSerial, std::move(packet))); _skipList.push_back(SkipInfo(firstSerial, firstPos)); } } @@ -491,23 +487,17 @@ DomainPart::visit(SerialNumRange &r, Packet &packet) if (e.serial() <= r.to()) { LOG(spam, "Adding serial #%" PRIu64 ", of type %d and size %zd into packet of size %zu and %zu bytes", e.serial(), e.type(), e.data().size(), newPacket.size(), newPacket.sizeBytes()); - if (newPacket.add(e)) { - r.from(e.serial()); - } else { - throw runtime_error("Could not add entry to packet. Here is some mumbo jumbo. Fix."); - } + newPacket.add(e); + r.from(e.serial()); } else { // Force breakout on visiting empty interval. r.from(r.to()); } } } - newPacket.close(); - packet = newPacket; + packet = std::move(newPacket); retval = next != _packets.end(); } - } else { - packet.close(); } } else { /// File has been closed must continue from file. @@ -524,38 +514,14 @@ DomainPart::visit(FastOS_FileInterface &file, SerialNumRange &r, Packet &packet) if ( ! file.IsOpened() ) { retval = openAndFind(file, r.from() + 1); } - if (retval) { - Packet newPacket; - Alloc buf; - for (bool full(false);!full && retval && (r.from() < r.to());) { - Packet::Entry e; - int64_t fPos = file.GetPosition(); - retval = read(file, e, buf, false); - if (retval && - e.valid() && - (r.from() < e.serial()) && - (e.serial() <= r.to())) { - try { - full = addPacket(newPacket, e); - } catch (const std::exception & ex) { - throw runtime_error(make_string("%s : Failed creating packet for visit %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", - ex.what(), file.GetFileName(), file.GetSize(), fPos, file.GetPosition())); - } - if ( !full ) { - r.from(e.serial()); - } else { - if ( ! file.SetPosition(fPos) ) { - throw runtime_error(make_string("Failed setting read position for file '%s' of size %" PRId64 " from %" PRId64 " to %" PRId64 ".", - file.GetFileName(), file.GetSize(), file.GetPosition(), fPos)); - } - } - } - } - newPacket.close(); - packet = newPacket; + if ( ! retval) { + return false; } - return retval; + packet = readPacket(file, r, TARGET_PACKET_SIZE, false); + r.from(packet.range().to()); + + return true; } void @@ -570,7 +536,7 @@ DomainPart::write(FastOS_FileInterface &file, const Packet::Entry &entry) size_t start(os.size()); entry.serialize(os); size_t end(os.size()); - crc = calcCrc(_defaultCrc, os.c_str()+start, end - start); + crc = Encoding::calcCrc(_defaultCrc, os.c_str()+start, end - start); os << crc; size_t osSize = os.size(); assert(osSize == len + sizeof(len) + sizeof(uint8_t)); @@ -584,68 +550,54 @@ DomainPart::write(FastOS_FileInterface &file, const Packet::Entry &entry) } bool -DomainPart::read(FastOS_FileInterface &file, Packet::Entry &entry, Alloc & buf, bool allowTruncate) +DomainPart::read(FastOS_FileInterface &file, IChunk::UP & chunk, Alloc & buf, bool allowTruncate) { - bool retval(true); + bool retval(false); char tmp[5]; int64_t lastKnownGoodPos(file.GetPosition()); size_t rlen = file.Read(tmp, sizeof(tmp)); nbostream his(tmp, sizeof(tmp)); - uint8_t version(-1); + uint8_t encoding(-1); uint32_t len(0); - his >> version >> len; - if ((retval = (rlen == sizeof(tmp)))) { - if ( ! (retval = (version == ccitt_crc32) || version == xxh64)) { - string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2'," - " got %d from '%s' at position %ld", - version, file.GetFileName(), lastKnownGoodPos)); - if ((version == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) { - LOG(warning, "%s", msg.c_str()); - return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate); - } else { - throw runtime_error(msg); - } - } - if (len > buf.size()) { - Alloc::alloc(len).swap(buf); - } - rlen = file.Read(buf.get(), len); - retval = rlen == len; - if (!retval) { - retval = handleReadError("packet blob", file, len, rlen, lastKnownGoodPos, allowTruncate); - } else { - nbostream_longlivedbuf is(buf.get(), len); - entry.deserialize(is); - int32_t crc(0); - is >> crc; - int32_t crcVerify(calcCrc(static_cast(version), buf.get(), len - sizeof(crc))); - if (crc != crcVerify) { - throw runtime_error(make_string("Got bad crc for packet from '%s' (len pos=%" PRId64 ", len=%d) : crcVerify = %d, expected %d", - file.GetFileName(), file.GetPosition() - len - sizeof(len), - static_cast(len), static_cast(crcVerify), static_cast(crc))); - } - } - } else { + his >> encoding >> len; + if (rlen != sizeof(tmp)) { if (rlen == 0) { - // Eof + retval = true; // Eof } else { - retval = handleReadError("packet length", file, sizeof(len), rlen, lastKnownGoodPos, allowTruncate); + retval = handleReadError("packet length", file, sizeof(len), rlen, lastKnownGoodPos, allowTruncate); } + return retval; } - return retval; -} -int32_t DomainPart::calcCrc(Crc version, const void * buf, size_t sz) -{ - if (version == xxh64) { - return static_cast(XXH64(buf, sz, 0ll)); - } else if (version == ccitt_crc32) { - vespalib::crc_32_type calculator; - calculator.process_bytes(buf, sz); - return calculator.checksum(); - } else { - abort(); + try { + chunk = IChunk::create(encoding); + } catch (const std::exception & e) { + string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2'," + " got %d from '%s' at position %ld", + encoding, file.GetFileName(), lastKnownGoodPos)); + if ((encoding == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) { + LOG(warning, "%s", msg.c_str()); + return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate); + } else { + throw runtime_error(msg); + } + } + if (len > buf.size()) { + Alloc::alloc(len).swap(buf); + } + rlen = file.Read(buf.get(), len); + if (rlen != len) { + return handleReadError("packet blob", file, len, rlen, lastKnownGoodPos, allowTruncate); } + try { + nbostream_longlivedbuf is(buf.get(), len); + chunk->decode(is); + } catch (const std::exception & e) { + throw runtime_error(make_string("Got exception during decoding of packet '%s' from file '%s' (len pos=%" PRId64 ", len=%d)", + e.what(), file.GetFileName(), file.GetPosition() - len - sizeof(len), static_cast(len))); + } + + return true; } } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h index 59d0df6df94..ef630904100 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h @@ -2,6 +2,7 @@ #pragma once #include "common.h" +#include "ichunk.h" #include #include #include @@ -19,10 +20,7 @@ private: DomainPart& operator=(const DomainPart &); public: - enum Crc { - ccitt_crc32=1, - xxh64=2 - }; + using Crc = Encoding::Crc; typedef std::shared_ptr SP; DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Crc defaultCrc, const common::FileHeaderContext &FileHeaderContext, bool allowTruncate); @@ -49,13 +47,13 @@ public: } bool isClosed() const; private: + using Alloc = vespalib::alloc::Alloc; bool openAndFind(FastOS_FileInterface &file, const SerialNum &from); int64_t buildPacketMapping(bool allowTruncate); - - static bool read(FastOS_FileInterface &file, Packet::Entry &entry, vespalib::alloc::Alloc &buf, bool allowTruncate); + static Packet readPacket(FastOS_FileInterface & file, SerialNumRange wanted, size_t targetSize, bool allowTruncate); + static bool read(FastOS_FileInterface &file, IChunk::UP & chunk, Alloc &buf, bool allowTruncate); void write(FastOS_FileInterface &file, const Packet::Entry &entry); - static int32_t calcCrc(Crc crc, const void * buf, size_t len); void writeHeader(const common::FileHeaderContext &fileHeaderContext); class SkipInfo @@ -77,21 +75,21 @@ private: }; typedef std::vector SkipList; typedef std::map PacketList; - const Crc _defaultCrc; - vespalib::Lock _lock; - vespalib::Lock _fileLock; - SerialNumRange _range; - size_t _sz; + const Crc _defaultCrc; + vespalib::Lock _lock; + vespalib::Lock _fileLock; + SerialNumRange _range; + size_t _sz; std::atomic _byteSize; - PacketList _packets; - vespalib::string _fileName; + PacketList _packets; + vespalib::string _fileName; std::unique_ptr _transLog; - SkipList _skipList; - uint32_t _headerLen; - vespalib::Lock _writeLock; + SkipList _skipList; + uint32_t _headerLen; + vespalib::Lock _writeLock; // Protected by _writeLock - SerialNum _writtenSerial; - SerialNum _syncedSerial; + SerialNum _writtenSerial; + SerialNum _syncedSerial; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h new file mode 100644 index 00000000000..243731d82c6 --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h @@ -0,0 +1,51 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "common.h" +#include + +namespace vespalib { class nbostream; } + +namespace search::transactionlog { + +class Encoding { +public: + enum Crc { + nocrc = 0, + ccitt_crc32 = 1, + xxh64 = 2 + }; + enum Compression { + none = 0, + lz4 = 1 + }; + Encoding(uint8_t raw) : _raw(raw) {} + Encoding(Crc crc, Compression compression); + Crc getCrc() const { return Crc(_raw & 0x3); } + Compression getCompression() const { return Compression((_raw << 2) & 0xf); } + static int32_t calcCrc(Crc version, const void * buf, size_t sz); +private: + uint8_t _raw; +}; + +class IChunk { +public: + using UP = std::unique_ptr; + using Entries = std::vector; + using nbostream = vespalib::nbostream; + using ConstBufferRef = vespalib::ConstBufferRef; + virtual ~IChunk(); + const Entries & getEntries() const { return _entries; } + void add(const Packet::Entry & entry); + void encode(nbostream & os); + void decode(nbostream & buf); + static UP create(uint8_t chunkType); +protected: + virtual void onEncode(nbostream & os) = 0; + virtual void onDecode(nbostream & is) = 0; +private: + Entries _entries; +}; + +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp index cbcbc68fdff..56aaa162485 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp @@ -36,7 +36,7 @@ Session::VisitTask::run() bool Session::visit(FastOS_FileInterface & file, DomainPart & dp) { - Packet packet; + Packet packet(size_t(-1)); bool more(false); if (dp.isClosed()) { more = dp.visit(file, _range, packet); diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp index ff4a402b438..c6a4342632c 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp @@ -27,9 +27,9 @@ DomainPart::Crc getCrc(searchlib::TranslogserverConfig::Crcmethod crcType) { switch (crcType) { case searchlib::TranslogserverConfig::ccitt_crc32: - return DomainPart::ccitt_crc32; + return Encoding::Crc::ccitt_crc32; case searchlib::TranslogserverConfig::xxh64: - return DomainPart::xxh64; + return Encoding::Crc::xxh64; } abort(); } -- cgit v1.2.3