aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib/src
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-10-20 18:28:29 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2018-01-10 09:56:23 +0100
commita2433d41489b18bd2f1e785cd9dd15a30e609857 (patch)
tree6314bacd22b418094305dc7a301fced01b2e90f5 /searchlib/src
parent11c077cb1d5e9b0672331c865cd4ec9fe879e171 (diff)
Refactor Packet and read of TLS.
Diffstat (limited to 'searchlib/src')
-rw-r--r--searchlib/src/tests/transactionlog/translogclient_test.cpp52
-rw-r--r--searchlib/src/tests/transactionlogstress/translogstress.cpp7
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/chunks.cpp41
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/chunks.h23
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/common.cpp69
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/common.h8
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp13
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h3
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp256
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.h36
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/ichunk.h51
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/session.cpp2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp4
14 files changed, 322 insertions, 245 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp
index 861023b79b7..b61245c8137 100644
--- a/searchlib/src/tests/transactionlog/translogclient_test.cpp
+++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp
@@ -5,7 +5,6 @@
#include <vespa/vespalib/objects/identifiable.h>
#include <vespa/searchlib/index/dummyfileheadercontext.h>
#include <vespa/fastos/file.h>
-#include <map>
#include <vespa/log/log.h>
LOG_SETUP("translogclient_test");
@@ -221,6 +220,7 @@ public:
IMPLEMENT_IDENTIFIABLE(TestIdentifiable, Identifiable);
+constexpr size_t DEFAULT_PACKET_SIZE = 0xf000;
bool Test::partialUpdateTest()
{
bool retval(false);
@@ -239,9 +239,8 @@ bool Test::partialUpdateTest()
vespalib::ConstBufferRef bb(os.c_str(), os.size());
LOG(info, "DU : %s", myhex(bb.c_str(), bb.size()).c_str());
Packet::Entry e(7, du.getClass().id(), bb);
- Packet pa;
+ Packet pa(DEFAULT_PACKET_SIZE);
pa.add(e);
- pa.close();
ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().c_str(), pa.getHandle().size())));
CallBackUpdate ca;
@@ -312,14 +311,12 @@ bool Test::fillDomainTest(TransLogClient::Session * s1, const vespalib::string &
Packet::Entry e2(2, 2, vespalib::ConstBufferRef("Content in buffer B", 20));
Packet::Entry e3(3, 1, vespalib::ConstBufferRef("Content in buffer C", 20));
- Packet a;
- ASSERT_TRUE (a.add(e1));
- Packet b;
- ASSERT_TRUE (b.add(e2));
- ASSERT_TRUE (b.add(e3));
- ASSERT_TRUE (!b.add(e1));
- a.close();
- b.close();
+ Packet a(DEFAULT_PACKET_SIZE);
+ a.add(e1);
+ Packet b(DEFAULT_PACKET_SIZE);
+ b.add(e2);
+ b.add(e3);
+ b.add(e1);
ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(a.getHandle().c_str(), a.getHandle().size())));
ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(b.getHandle().c_str(), b.getHandle().size())));
try {
@@ -334,7 +331,7 @@ bool Test::fillDomainTest(TransLogClient::Session * s1, const vespalib::string &
EXPECT_EQUAL(b.size(), 2u);
EXPECT_EQUAL(b.range().from(), 2u);
EXPECT_EQUAL(b.range().to(), 3u);
- EXPECT_TRUE(a.merge(b));
+ a.merge(b);
EXPECT_EQUAL(a.size(), 3u);
EXPECT_EQUAL(a.range().from(), 1u);
EXPECT_EQUAL(a.range().to(), 3u);
@@ -353,41 +350,35 @@ void Test::fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_
{
size_t value(0);
for(size_t i=0; i < numPackets; i++) {
- std::unique_ptr<Packet> p(new Packet());
+ std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE));
for(size_t j=0; j < numEntries; j++, value++) {
Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value)));
- if ( ! p->add(e) ) {
- p->close();
+ p->add(e);
+ if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) {
ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size())));
- p.reset(new Packet());
- ASSERT_TRUE(p->add(e));
+ p.reset(new Packet(DEFAULT_PACKET_SIZE));
}
}
- p->close();
ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size())));
}
}
void
-Test::fillDomainTest(TransLogClient::Session * s1,
- size_t numPackets, size_t numEntries,
- size_t entrySize)
+Test::fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize)
{
size_t value(0);
std::vector<char> entryBuffer(entrySize);
for(size_t i=0; i < numPackets; i++) {
- std::unique_ptr<Packet> p(new Packet());
+ std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE));
for(size_t j=0; j < numEntries; j++, value++) {
Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&entryBuffer[0], entryBuffer.size()));
- if ( ! p->add(e) ) {
- p->close();
+ p->add(e);
+ if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) {
ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size())));
- p.reset(new Packet());
- ASSERT_TRUE(p->add(e));
+ p.reset(new Packet(DEFAULT_PACKET_SIZE));
}
}
- p->close();
ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size())));
}
}
@@ -410,8 +401,7 @@ Test::countFiles(const vespalib::string &dir)
void
-Test::checkFilledDomainTest(const TransLogClient::Session::UP &s1,
- size_t numEntries)
+Test::checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries)
{
SerialNum b(0), e(0);
size_t c(0);
@@ -526,8 +516,8 @@ void Test::verifyDomain(const vespalib::string & name)
void Test::testCrcVersions()
{
- createAndFillDomain("ccitt_crc32", DomainPart::ccitt_crc32, 0);
- createAndFillDomain("xxh64", DomainPart::xxh64, 1);
+ createAndFillDomain("ccitt_crc32", Encoding::Crc::ccitt_crc32, 0);
+ createAndFillDomain("xxh64", Encoding::Crc::xxh64, 1);
verifyDomain("ccitt_crc32");
verifyDomain("xxh64");
diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp
index abba84b75b6..62ef49ff689 100644
--- a/searchlib/src/tests/transactionlogstress/translogstress.cpp
+++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp
@@ -8,7 +8,6 @@
#include <vespa/searchlib/index/dummyfileheadercontext.h>
#include <vespa/fastos/app.h>
#include <iostream>
-#include <stdexcept>
#include <sstream>
#include <vespa/log/log.h>
@@ -221,7 +220,6 @@ FeederThread::~FeederThread() {}
void
FeederThread::commitPacket()
{
- _packet.close();
const vespalib::nbostream& stream = _packet.getHandle();
if (!_session->commit(ConstBufferRef(stream.c_str(), stream.size()))) {
throw std::runtime_error(vespalib::make_string
@@ -236,8 +234,9 @@ FeederThread::commitPacket()
bool
FeederThread::addEntry(const Packet::Entry & e)
{
- //LOG(info, "FeederThread: add %s", EntryPrinter::toStr(e).c_str());
- return _packet.add(e);
+ if (_packet.sizeBytes() > 0xf000) return false;
+ _packet.add(e);
+ return true;
}
void
diff --git a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt
index d964c88fe29..0755d07b403 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt
@@ -1,9 +1,11 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(searchlib_transactionlog OBJECT
SOURCES
+ chunks.cpp
common.cpp
domain.cpp
domainpart.cpp
+ ichunk.cpp
nosyncproxy.cpp
session.cpp
trans_log_server_explorer.cpp
diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp
new file mode 100644
index 00000000000..86369cf6923
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp
@@ -0,0 +1,41 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "chunks.h"
+#include <vespa/vespalib/util/stringfmt.h>
+
+using std::runtime_error;
+using std::make_unique;
+using vespalib::make_string;
+
+namespace search::transactionlog {
+
+void XXH64None::onEncode(nbostream &os) {
+ (void) os;
+}
+
+void XXH64None::onDecode(nbostream &is) {
+ if (is.size() < sizeof(int32_t)) {
+ throw runtime_error(make_string("Not even room for the crc. Only %zu bytes left", is.size()));
+ }
+ size_t start = is.rp();
+ is.adjustReadPos(is.size() - sizeof(int32_t));
+ int32_t crc(0);
+ is >> crc;
+ is.rp(start);
+ int32_t crcVerify = Encoding::calcCrc(Encoding::Crc::xxh64, is.c_str(), is.size() - sizeof(crc));
+ if (crc != crcVerify) {
+ throw runtime_error(make_string("Got bad crc : crcVerify = %d, expected %d",
+ static_cast<int>(crcVerify), static_cast<int>(crc)));
+ }
+}
+
+void XXH64LZ4::onEncode(IChunk::nbostream &os) {
+ (void) os;
+
+}
+
+void XXH64LZ4::onDecode(IChunk::nbostream &is) {
+ (void) is;
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.h b/searchlib/src/vespa/searchlib/transactionlog/chunks.h
new file mode 100644
index 00000000000..1e1d8ff23eb
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/transactionlog/chunks.h
@@ -0,0 +1,23 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "ichunk.h"
+
+namespace search::transactionlog {
+
+class XXH64None : public IChunk {
+protected:
+ void onEncode(nbostream &os) override;
+ void onDecode(nbostream &is) override;
+public:
+};
+
+class XXH64LZ4 : public IChunk {
+protected:
+ void onEncode(nbostream &os) override;
+ void onDecode(nbostream &is) override;
+public:
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.cpp b/searchlib/src/vespa/searchlib/transactionlog/common.cpp
index 7d919f9ad2e..9bf43c8e244 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/common.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/common.cpp
@@ -1,14 +1,29 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "common.h"
+#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/fastos/file.h>
namespace search::transactionlog {
using vespalib::nbostream;
using vespalib::nbostream_longlivedbuf;
+using vespalib::make_string;
+using std::runtime_error;
-int makeDirectory(const char * dir)
+namespace {
+
+void throwRangeError(SerialNum prev, SerialNum next) __attribute__((noinline));
+
+void throwRangeError(SerialNum prev, SerialNum next) {
+ if (prev < next) return;
+ throw runtime_error(make_string("The new serialnum %zu is not higher than the old one %zu", next, prev));
+}
+
+}
+
+int
+makeDirectory(const char * dir)
{
int retval(-1);
@@ -22,7 +37,8 @@ int makeDirectory(const char * dir)
return retval;
}
-int64_t SerialNumRange::cmp(const SerialNumRange & b) const
+int64_t
+SerialNumRange::cmp(const SerialNumRange & b) const
{
int64_t diff(0);
if ( ! (contains(b) || b.contains(*this)) ) {
@@ -34,7 +50,6 @@ int64_t SerialNumRange::cmp(const SerialNumRange & b) const
Packet::Packet(const void * buf, size_t sz) :
_count(0),
_range(),
- _limit(sz),
_buf(static_cast<const char *>(buf), sz)
{
nbostream_longlivedbuf os(_buf.c_str(), sz);
@@ -49,21 +64,22 @@ Packet::Packet(const void * buf, size_t sz) :
}
}
-bool Packet::merge(const Packet & packet)
+void
+Packet::merge(const Packet & packet)
{
- bool retval(_range.to() < packet.range().from());
- if (retval) {
- if (_buf.empty()) {
- _range.from(packet.range().from());
- }
- _count += packet._count;
- _range.to(packet._range.to());
- _buf.write(packet.getHandle().c_str(), packet.getHandle().size());
+ if (_range.to() >= packet.range().from()) {
+ throwRangeError(_range.to(), packet.range().from());
}
- return retval;
+ if (_buf.empty()) {
+ _range.from(packet.range().from());
+ }
+ _count += packet._count;
+ _range.to(packet._range.to());
+ _buf.write(packet.getHandle().c_str(), packet.getHandle().size());
}
-nbostream & Packet::Entry::deserialize(nbostream & os)
+nbostream &
+Packet::Entry::deserialize(nbostream & os)
{
_valid = false;
int32_t len(0);
@@ -74,7 +90,8 @@ nbostream & Packet::Entry::deserialize(nbostream & os)
return os;
}
-nbostream & Packet::Entry::serialize(nbostream & os) const
+nbostream &
+Packet::Entry::serialize(nbostream & os) const
{
os << _unique << _type << static_cast<uint32_t>(_data.size());
os.write(_data.c_str(), _data.size());
@@ -88,19 +105,19 @@ Packet::Entry::Entry(SerialNum u, Type t, const vespalib::ConstBufferRef & d) :
_data(d)
{ }
-
-bool Packet::add(const Packet::Entry & e)
+void
+Packet::add(const Packet::Entry & e)
{
- bool retval((_buf.size() < _limit) && (_range.to() < e.serial()));
- if (retval) {
- if (_buf.empty()) {
- _range.from(e.serial());
- }
- e.serialize(_buf);
- _count++;
- _range.to(e.serial());
+ if (_range.to() >= e.serial()) {
+ throwRangeError(_range.to(), e.serial());
}
- return retval;
+
+ if (_buf.empty()) {
+ _range.from(e.serial());
+ }
+ e.serialize(_buf);
+ _count++;
+ _range.to(e.serial());
}
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h
index db8b9727daa..0deceb2668a 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/common.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/common.h
@@ -69,21 +69,19 @@ public:
vespalib::ConstBufferRef _data;
};
public:
- Packet(size_t m=0xf000) : _count(0), _range(), _limit(m), _buf(m) { }
+ Packet(size_t reserved) : _count(0), _range(), _buf(reserved) { }
Packet(const void * buf, size_t sz);
- bool add(const Entry & data);
- void close() { }
+ void add(const Entry & data);
void clear() { _buf.clear(); _count = 0; _range.from(0); _range.to(0); }
const SerialNumRange & range() const { return _range; }
const vespalib::nbostream & getHandle() const { return _buf; }
size_t size() const { return _count; }
bool empty() const { return _count == 0; }
size_t sizeBytes() const { return _buf.size(); }
- bool merge(const Packet & packet);
+ void merge(const Packet & packet);
private:
size_t _count;
SerialNumRange _range;
- size_t _limit;
vespalib::nbostream_longlivedbuf _buf;
};
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index 3248a0aba91..101efa8cb74 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -307,15 +307,20 @@ waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync)
}
+
+Domain::Chunk::Chunk()
+ : _data(size_t(-1)),
+ _callBacks(),
+ _firstArrivalTime()
+{}
+
+Domain::Chunk::~Chunk() = default;
void
Domain::Chunk::add(const Packet &packet, Writer::DoneCallback onDone) {
if (_callBacks.empty()) {
_firstArrivalTime = steady_clock::now();
}
- if ( ! _data.merge(packet) ) {
- throw runtime_error(make_string("Failed merging of packet %zu into packet %zu",
- packet.range().from(), _data.range().from()));
- }
+ _data.merge(packet);
_callBacks.emplace_back(std::move(onDone));
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index 7eb8e201d67..fb18bb955d7 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -83,7 +83,8 @@ private:
void commitIfStale(const vespalib::MonitorGuard & guard);
class Chunk {
public:
-
+ Chunk();
+ ~Chunk();
void add(const Packet & packet, Writer::DoneCallback onDone);
size_t sizeBytes() const { return _data.sizeBytes(); }
const Packet & getPacket() const { return _data; }
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
index 44f7b55882e..696b3acb095 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
@@ -2,7 +2,6 @@
#include "domainpart.h"
#include <vespa/vespalib/util/crc.h>
-#include <vespa/vespalib/xxhash/xxhash.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/data/fileheader.h>
#include <vespa/searchlib/common/fileheadercontext.h>
@@ -27,8 +26,7 @@ namespace search::transactionlog {
namespace {
-void
-handleSync(FastOS_FileInterface &file) __attribute__ ((noinline));
+constexpr size_t TARGET_PACKET_SIZE = 0x3f000;
string
handleWriteError(const char *text, FastOS_FileInterface &file, int64_t lastKnownGoodPos,
@@ -38,18 +36,16 @@ bool
handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize_t rlen,
int64_t lastKnownGoodPos, bool allowTruncate) __attribute__ ((noinline));
-bool
-addPacket(Packet &packet, const Packet::Entry &e) __attribute__ ((noinline));
+void handleSync(FastOS_FileInterface &file) __attribute__ ((noinline));
+void addPacket(Packet &packet, const Packet::Entry &e) __attribute__ ((noinline));
+bool tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) __attribute__ ((noinline));
-bool
-tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) __attribute__ ((noinline));
-
-bool
+void
addPacket(Packet &packet, const Packet::Entry &e)
{
LOG(spam, "Adding serial #%" PRIu64 ", of type %d and size %zd into packet of size %zu and %zu bytes",
e.serial(), e.type(), e.data().size(), packet.size(), packet.sizeBytes());
- return ! packet.add(e);
+ packet.add(e);
}
void
@@ -163,6 +159,43 @@ handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize
}
+Packet
+DomainPart::readPacket(FastOS_FileInterface & transLog, SerialNumRange wanted, size_t targetSize, bool allowTruncate) {
+ Alloc buf;
+ Packet packet(targetSize);
+ int64_t fSize(transLog.GetSize());
+ int64_t currPos(transLog.GetPosition());
+ for(size_t i(0); (packet.sizeBytes() < targetSize) && (currPos < fSize) && (packet.range().to() < wanted.to()); i++) {
+ IChunk::UP chunk;
+ if (read(transLog, chunk, buf, allowTruncate)) {
+ if (chunk) {
+ try {
+ for (const Packet::Entry & e : chunk->getEntries()) {
+ if ((wanted.from() < e.serial()) && (e.serial() <= wanted.to())) {
+ addPacket(packet, e);
+ }
+ }
+ } catch (const std::exception & ex) {
+ throw runtime_error(make_string("%s : Failed creating packet for list %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
+ ex.what(), transLog.GetFileName(), fSize, currPos, transLog.GetPosition()));
+ }
+ } else {
+ throw runtime_error(make_string("Invalid entry reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
+ transLog.GetFileName(), fSize, currPos, transLog.GetPosition()));
+ }
+ } else {
+ if (transLog.GetSize() != fSize) {
+ fSize = transLog.GetSize();
+ } else {
+ throw runtime_error(make_string("Failed reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
+ transLog.GetFileName(), fSize, currPos, transLog.GetPosition()));
+ }
+ }
+ currPos = transLog.GetPosition();
+ }
+ return packet;
+}
+
int64_t
DomainPart::buildPacketMapping(bool allowTruncate)
{
@@ -171,7 +204,7 @@ DomainPart::buildPacketMapping(bool allowTruncate)
if ( ! transLog.OpenReadOnly(_transLog->GetFileName())) {
throw runtime_error(make_string("Failed opening '%s' for buffered readinf with direct io.", transLog.GetFileName()));
}
- int64_t fSize(transLog.GetSize());
+ const int64_t fSize(transLog.GetSize());
int64_t currPos(0);
try {
FileHeader header;
@@ -193,58 +226,24 @@ DomainPart::buildPacketMapping(bool allowTruncate)
handleReadError("file header", transLog, 0, FileHeader::getMinSize(), 0, allowTruncate);
}
}
+ const SerialNumRange all(0, std::numeric_limits<SerialNum>::max());
while ((currPos < fSize)) {
- Packet packet;
- SerialNum firstSerial(0);
- SerialNum lastSerial(0);
- int64_t firstPos(currPos);
- bool full(false);
- Alloc buf;
- for(size_t i(0); !full && (currPos < fSize); i++) {
- Packet::Entry e;
- if (read(transLog, e, buf, allowTruncate)) {
- if (e.valid()) {
- if (i == 0) {
- firstSerial = e.serial();
- if (currPos == _headerLen) {
- _range.from(firstSerial);
- }
- }
- try {
- full = addPacket(packet, e);
- if ( ! full ) {
- lastSerial = e.serial();
- currPos = transLog.GetPosition();
- _sz++;
- } else {
- transLog.SetPosition(currPos);
- }
- } catch (const std::exception & ex) {
- throw runtime_error(make_string("%s : Failed creating packet for list %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
- ex.what(), transLog.GetFileName(), fSize, currPos, transLog.GetPosition()));
- }
- } else {
- throw runtime_error(make_string("Invalid entry reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
- transLog.GetFileName(), fSize, currPos, transLog.GetPosition()));
- }
- } else {
- if (transLog.GetSize() != fSize) {
- fSize = transLog.GetSize();
- } else {
- throw runtime_error(make_string("Failed reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
- transLog.GetFileName(), fSize, currPos, transLog.GetPosition()));
- }
- }
- }
- packet.close();
+ const int64_t firstPos(currPos);
+ Packet packet = readPacket(transLog, all, TARGET_PACKET_SIZE, allowTruncate);
if (!packet.empty()) {
- _packets[firstSerial] = packet;
- _range.to(lastSerial);
+ _sz += packet.size();
+ const SerialNum firstSerial = packet.range().from();
+ if (currPos == _headerLen) {
+ _range.from(firstSerial);
+ }
+ _range.to(packet.range().to());
+ _packets.insert(std::make_pair(firstSerial, std::move(packet)));
{
LockGuard guard(_lock);
_skipList.push_back(SkipInfo(firstSerial, firstPos));
}
}
+ currPos = _transLog->GetPosition();
}
transLog.Close();
return currPos;
@@ -413,15 +412,12 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet)
if ( ! _packets.empty() ) {
Packet & lastPacket = _packets.rbegin()->second;
if (lastPacket.sizeBytes() < 0xf000) {
- if ( ! (merged = lastPacket.merge(packet)) ) {
- LOG(error, "Failed merging packet [%" PRIu64 ", %" PRIu64 "] with [%" PRIu64 ", %" PRIu64 "]",
- lastPacket.range().from(), lastPacket.range().to(),
- packet.range().from(), packet.range().to());
- }
+ lastPacket.merge(packet);
+ merged = true;
}
}
if (! merged ) {
- _packets[firstSerial] = packet;
+ _packets.insert(std::make_pair(firstSerial, std::move(packet)));
_skipList.push_back(SkipInfo(firstSerial, firstPos));
}
}
@@ -491,23 +487,17 @@ DomainPart::visit(SerialNumRange &r, Packet &packet)
if (e.serial() <= r.to()) {
LOG(spam, "Adding serial #%" PRIu64 ", of type %d and size %zd into packet of size %zu and %zu bytes",
e.serial(), e.type(), e.data().size(), newPacket.size(), newPacket.sizeBytes());
- if (newPacket.add(e)) {
- r.from(e.serial());
- } else {
- throw runtime_error("Could not add entry to packet. Here is some mumbo jumbo. Fix.");
- }
+ newPacket.add(e);
+ r.from(e.serial());
} else {
// Force breakout on visiting empty interval.
r.from(r.to());
}
}
}
- newPacket.close();
- packet = newPacket;
+ packet = std::move(newPacket);
retval = next != _packets.end();
}
- } else {
- packet.close();
}
} else {
/// File has been closed must continue from file.
@@ -524,38 +514,14 @@ DomainPart::visit(FastOS_FileInterface &file, SerialNumRange &r, Packet &packet)
if ( ! file.IsOpened() ) {
retval = openAndFind(file, r.from() + 1);
}
- if (retval) {
- Packet newPacket;
- Alloc buf;
- for (bool full(false);!full && retval && (r.from() < r.to());) {
- Packet::Entry e;
- int64_t fPos = file.GetPosition();
- retval = read(file, e, buf, false);
- if (retval &&
- e.valid() &&
- (r.from() < e.serial()) &&
- (e.serial() <= r.to())) {
- try {
- full = addPacket(newPacket, e);
- } catch (const std::exception & ex) {
- throw runtime_error(make_string("%s : Failed creating packet for visit %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
- ex.what(), file.GetFileName(), file.GetSize(), fPos, file.GetPosition()));
- }
- if ( !full ) {
- r.from(e.serial());
- } else {
- if ( ! file.SetPosition(fPos) ) {
- throw runtime_error(make_string("Failed setting read position for file '%s' of size %" PRId64 " from %" PRId64 " to %" PRId64 ".",
- file.GetFileName(), file.GetSize(), file.GetPosition(), fPos));
- }
- }
- }
- }
- newPacket.close();
- packet = newPacket;
+ if ( ! retval) {
+ return false;
}
- return retval;
+ packet = readPacket(file, r, TARGET_PACKET_SIZE, false);
+ r.from(packet.range().to());
+
+ return true;
}
void
@@ -570,7 +536,7 @@ DomainPart::write(FastOS_FileInterface &file, const Packet::Entry &entry)
size_t start(os.size());
entry.serialize(os);
size_t end(os.size());
- crc = calcCrc(_defaultCrc, os.c_str()+start, end - start);
+ crc = Encoding::calcCrc(_defaultCrc, os.c_str()+start, end - start);
os << crc;
size_t osSize = os.size();
assert(osSize == len + sizeof(len) + sizeof(uint8_t));
@@ -584,68 +550,54 @@ DomainPart::write(FastOS_FileInterface &file, const Packet::Entry &entry)
}
bool
-DomainPart::read(FastOS_FileInterface &file, Packet::Entry &entry, Alloc & buf, bool allowTruncate)
+DomainPart::read(FastOS_FileInterface &file, IChunk::UP & chunk, Alloc & buf, bool allowTruncate)
{
- bool retval(true);
+ bool retval(false);
char tmp[5];
int64_t lastKnownGoodPos(file.GetPosition());
size_t rlen = file.Read(tmp, sizeof(tmp));
nbostream his(tmp, sizeof(tmp));
- uint8_t version(-1);
+ uint8_t encoding(-1);
uint32_t len(0);
- his >> version >> len;
- if ((retval = (rlen == sizeof(tmp)))) {
- if ( ! (retval = (version == ccitt_crc32) || version == xxh64)) {
- string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2',"
- " got %d from '%s' at position %ld",
- version, file.GetFileName(), lastKnownGoodPos));
- if ((version == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) {
- LOG(warning, "%s", msg.c_str());
- return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate);
- } else {
- throw runtime_error(msg);
- }
- }
- if (len > buf.size()) {
- Alloc::alloc(len).swap(buf);
- }
- rlen = file.Read(buf.get(), len);
- retval = rlen == len;
- if (!retval) {
- retval = handleReadError("packet blob", file, len, rlen, lastKnownGoodPos, allowTruncate);
- } else {
- nbostream_longlivedbuf is(buf.get(), len);
- entry.deserialize(is);
- int32_t crc(0);
- is >> crc;
- int32_t crcVerify(calcCrc(static_cast<Crc>(version), buf.get(), len - sizeof(crc)));
- if (crc != crcVerify) {
- throw runtime_error(make_string("Got bad crc for packet from '%s' (len pos=%" PRId64 ", len=%d) : crcVerify = %d, expected %d",
- file.GetFileName(), file.GetPosition() - len - sizeof(len),
- static_cast<int>(len), static_cast<int>(crcVerify), static_cast<int>(crc)));
- }
- }
- } else {
+ his >> encoding >> len;
+ if (rlen != sizeof(tmp)) {
if (rlen == 0) {
- // Eof
+ retval = true; // Eof
} else {
- retval = handleReadError("packet length", file, sizeof(len), rlen, lastKnownGoodPos, allowTruncate);
+ retval = handleReadError("packet length", file, sizeof(len), rlen, lastKnownGoodPos, allowTruncate);
}
+ return retval;
}
- return retval;
-}
-int32_t DomainPart::calcCrc(Crc version, const void * buf, size_t sz)
-{
- if (version == xxh64) {
- return static_cast<int32_t>(XXH64(buf, sz, 0ll));
- } else if (version == ccitt_crc32) {
- vespalib::crc_32_type calculator;
- calculator.process_bytes(buf, sz);
- return calculator.checksum();
- } else {
- abort();
+ try {
+ chunk = IChunk::create(encoding);
+ } catch (const std::exception & e) {
+ string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2',"
+ " got %d from '%s' at position %ld",
+ encoding, file.GetFileName(), lastKnownGoodPos));
+ if ((encoding == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) {
+ LOG(warning, "%s", msg.c_str());
+ return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate);
+ } else {
+ throw runtime_error(msg);
+ }
+ }
+ if (len > buf.size()) {
+ Alloc::alloc(len).swap(buf);
+ }
+ rlen = file.Read(buf.get(), len);
+ if (rlen != len) {
+ return handleReadError("packet blob", file, len, rlen, lastKnownGoodPos, allowTruncate);
}
+ try {
+ nbostream_longlivedbuf is(buf.get(), len);
+ chunk->decode(is);
+ } catch (const std::exception & e) {
+ throw runtime_error(make_string("Got exception during decoding of packet '%s' from file '%s' (len pos=%" PRId64 ", len=%d)",
+ e.what(), file.GetFileName(), file.GetPosition() - len - sizeof(len), static_cast<int>(len)));
+ }
+
+ return true;
}
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
index 59d0df6df94..ef630904100 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
@@ -2,6 +2,7 @@
#pragma once
#include "common.h"
+#include "ichunk.h"
#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/util/memory.h>
#include <map>
@@ -19,10 +20,7 @@ private:
DomainPart& operator=(const DomainPart &);
public:
- enum Crc {
- ccitt_crc32=1,
- xxh64=2
- };
+ using Crc = Encoding::Crc;
typedef std::shared_ptr<DomainPart> SP;
DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Crc defaultCrc,
const common::FileHeaderContext &FileHeaderContext, bool allowTruncate);
@@ -49,13 +47,13 @@ public:
}
bool isClosed() const;
private:
+ using Alloc = vespalib::alloc::Alloc;
bool openAndFind(FastOS_FileInterface &file, const SerialNum &from);
int64_t buildPacketMapping(bool allowTruncate);
-
- static bool read(FastOS_FileInterface &file, Packet::Entry &entry, vespalib::alloc::Alloc &buf, bool allowTruncate);
+ static Packet readPacket(FastOS_FileInterface & file, SerialNumRange wanted, size_t targetSize, bool allowTruncate);
+ static bool read(FastOS_FileInterface &file, IChunk::UP & chunk, Alloc &buf, bool allowTruncate);
void write(FastOS_FileInterface &file, const Packet::Entry &entry);
- static int32_t calcCrc(Crc crc, const void * buf, size_t len);
void writeHeader(const common::FileHeaderContext &fileHeaderContext);
class SkipInfo
@@ -77,21 +75,21 @@ private:
};
typedef std::vector<SkipInfo> SkipList;
typedef std::map<SerialNum, Packet> PacketList;
- const Crc _defaultCrc;
- vespalib::Lock _lock;
- vespalib::Lock _fileLock;
- SerialNumRange _range;
- size_t _sz;
+ const Crc _defaultCrc;
+ vespalib::Lock _lock;
+ vespalib::Lock _fileLock;
+ SerialNumRange _range;
+ size_t _sz;
std::atomic<uint64_t> _byteSize;
- PacketList _packets;
- vespalib::string _fileName;
+ PacketList _packets;
+ vespalib::string _fileName;
std::unique_ptr<FastOS_FileInterface> _transLog;
- SkipList _skipList;
- uint32_t _headerLen;
- vespalib::Lock _writeLock;
+ SkipList _skipList;
+ uint32_t _headerLen;
+ vespalib::Lock _writeLock;
// Protected by _writeLock
- SerialNum _writtenSerial;
- SerialNum _syncedSerial;
+ SerialNum _writtenSerial;
+ SerialNum _syncedSerial;
};
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
new file mode 100644
index 00000000000..243731d82c6
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
@@ -0,0 +1,51 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "common.h"
+#include <memory>
+
+namespace vespalib { class nbostream; }
+
+namespace search::transactionlog {
+
+class Encoding {
+public:
+ enum Crc {
+ nocrc = 0,
+ ccitt_crc32 = 1,
+ xxh64 = 2
+ };
+ enum Compression {
+ none = 0,
+ lz4 = 1
+ };
+ Encoding(uint8_t raw) : _raw(raw) {}
+ Encoding(Crc crc, Compression compression);
+ Crc getCrc() const { return Crc(_raw & 0x3); }
+ Compression getCompression() const { return Compression((_raw << 2) & 0xf); }
+ static int32_t calcCrc(Crc version, const void * buf, size_t sz);
+private:
+ uint8_t _raw;
+};
+
+class IChunk {
+public:
+ using UP = std::unique_ptr<IChunk>;
+ using Entries = std::vector<Packet::Entry>;
+ using nbostream = vespalib::nbostream;
+ using ConstBufferRef = vespalib::ConstBufferRef;
+ virtual ~IChunk();
+ const Entries & getEntries() const { return _entries; }
+ void add(const Packet::Entry & entry);
+ void encode(nbostream & os);
+ void decode(nbostream & buf);
+ static UP create(uint8_t chunkType);
+protected:
+ virtual void onEncode(nbostream & os) = 0;
+ virtual void onDecode(nbostream & is) = 0;
+private:
+ Entries _entries;
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp
index cbcbc68fdff..56aaa162485 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp
@@ -36,7 +36,7 @@ Session::VisitTask::run()
bool
Session::visit(FastOS_FileInterface & file, DomainPart & dp) {
- Packet packet;
+ Packet packet(size_t(-1));
bool more(false);
if (dp.isClosed()) {
more = dp.visit(file, _range, packet);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
index ff4a402b438..c6a4342632c 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
@@ -27,9 +27,9 @@ DomainPart::Crc getCrc(searchlib::TranslogserverConfig::Crcmethod crcType)
{
switch (crcType) {
case searchlib::TranslogserverConfig::ccitt_crc32:
- return DomainPart::ccitt_crc32;
+ return Encoding::Crc::ccitt_crc32;
case searchlib::TranslogserverConfig::xxh64:
- return DomainPart::xxh64;
+ return Encoding::Crc::xxh64;
}
abort();
}