diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-01-11 21:59:35 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-01-11 21:59:35 +0100 |
commit | bd891f6ff6e029dca4083ca9205df8f36bda09ed (patch) | |
tree | c415ae0f405c89f156f01bce6ded49cbf350ae60 /searchlib | |
parent | 5525fa9c0384b195f7e2cbf66078db6cc8d7df66 (diff) | |
parent | 3d4e8b924f4bf0bcf3001964d313289f47780042 (diff) |
Merge pull request #4601 from vespa-engine/balder/group-multiple-commits-rebased-1
Balder/group multiple commits rebased 1
Diffstat (limited to 'searchlib')
19 files changed, 905 insertions, 416 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index 861023b79b7..5b28fdc6567 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -5,7 +5,6 @@ #include <vespa/vespalib/objects/identifiable.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/fastos/file.h> -#include <map> #include <vespa/log/log.h> LOG_SETUP("translogclient_test"); @@ -45,7 +44,7 @@ private: bool partialUpdateTest(); bool testVisitOverGeneratedDomain(); bool testRemove(); - void createAndFillDomain(const vespalib::string & name, DomainPart::Crc crcMethod, size_t preExistingDomains); + void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains); void verifyDomain(const vespalib::string & name); void testCrcVersions(); bool testVisitOverPreExistingDomain(); @@ -221,11 +220,12 @@ public: IMPLEMENT_IDENTIFIABLE(TestIdentifiable, Identifiable); +constexpr size_t DEFAULT_PACKET_SIZE = 0xf000; bool Test::partialUpdateTest() { bool retval(false); DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, "test1"); @@ -239,9 +239,8 @@ bool Test::partialUpdateTest() vespalib::ConstBufferRef bb(os.c_str(), os.size()); LOG(info, "DU : %s", myhex(bb.c_str(), bb.size()).c_str()); Packet::Entry e(7, du.getClass().id(), bb); - Packet pa; + Packet pa(DEFAULT_PACKET_SIZE); pa.add(e); - pa.close(); ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().c_str(), pa.getHandle().size()))); CallBackUpdate ca; @@ -312,29 +311,24 @@ bool Test::fillDomainTest(TransLogClient::Session * s1, const vespalib::string & Packet::Entry e2(2, 2, vespalib::ConstBufferRef("Content in buffer B", 20)); Packet::Entry e3(3, 1, vespalib::ConstBufferRef("Content in buffer C", 20)); - Packet a; - ASSERT_TRUE (a.add(e1)); - Packet b; - ASSERT_TRUE (b.add(e2)); - ASSERT_TRUE (b.add(e3)); - ASSERT_TRUE (!b.add(e1)); - a.close(); - b.close(); + Packet a(DEFAULT_PACKET_SIZE); + a.add(e1); + Packet b(DEFAULT_PACKET_SIZE); + b.add(e2); + b.add(e3); + EXPECT_EXCEPTION(b.add(e1), std::runtime_error, ""); ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(a.getHandle().c_str(), a.getHandle().size()))); ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(b.getHandle().c_str(), b.getHandle().size()))); - try { - s1->commit(vespalib::ConstBufferRef(a.getHandle().c_str(), a.getHandle().size())); - ASSERT_TRUE(false); - } catch (const std::exception & e) { - EXPECT_EQUAL(vespalib::string("commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3)."), e.what()); - } + EXPECT_EXCEPTION(s1->commit(vespalib::ConstBufferRef(a.getHandle().c_str(), a.getHandle().size())), + std::runtime_error, + "commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3)."); EXPECT_EQUAL(a.size(), 1u); EXPECT_EQUAL(a.range().from(), 1u); EXPECT_EQUAL(a.range().to(), 1u); EXPECT_EQUAL(b.size(), 2u); EXPECT_EQUAL(b.range().from(), 2u); EXPECT_EQUAL(b.range().to(), 3u); - EXPECT_TRUE(a.merge(b)); + a.merge(b); EXPECT_EQUAL(a.size(), 3u); EXPECT_EQUAL(a.range().from(), 1u); EXPECT_EQUAL(a.range().to(), 3u); @@ -353,41 +347,35 @@ void Test::fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_ { size_t value(0); for(size_t i=0; i < numPackets; i++) { - std::unique_ptr<Packet> p(new Packet()); + std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE)); for(size_t j=0; j < numEntries; j++, value++) { Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value))); - if ( ! p->add(e) ) { - p->close(); + p->add(e); + if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) { ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size()))); - p.reset(new Packet()); - ASSERT_TRUE(p->add(e)); + p.reset(new Packet(DEFAULT_PACKET_SIZE)); } } - p->close(); ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size()))); } } void -Test::fillDomainTest(TransLogClient::Session * s1, - size_t numPackets, size_t numEntries, - size_t entrySize) +Test::fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize) { size_t value(0); std::vector<char> entryBuffer(entrySize); for(size_t i=0; i < numPackets; i++) { - std::unique_ptr<Packet> p(new Packet()); + std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE)); for(size_t j=0; j < numEntries; j++, value++) { Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&entryBuffer[0], entryBuffer.size())); - if ( ! p->add(e) ) { - p->close(); + p->add(e); + if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) { ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size()))); - p.reset(new Packet()); - ASSERT_TRUE(p->add(e)); + p.reset(new Packet(DEFAULT_PACKET_SIZE)); } } - p->close(); ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size()))); } } @@ -410,8 +398,7 @@ Test::countFiles(const vespalib::string &dir) void -Test::checkFilledDomainTest(const TransLogClient::Session::UP &s1, - size_t numEntries) +Test::checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries) { SerialNum b(0), e(0); size_t c(0); @@ -489,7 +476,7 @@ getMaxSessionRunTime(TransLogServer &tls, const vespalib::string &domain) bool Test::testVisitOverGeneratedDomain() { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); vespalib::string name("test1"); @@ -504,10 +491,11 @@ bool Test::testVisitOverGeneratedDomain() return true; } -void Test::createAndFillDomain(const vespalib::string & name, DomainPart::Crc crcMethod, size_t preExistingDomains) +void Test::createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains) { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000, 4, crcMethod); + TransLogServer tlss("test13", 18377, ".", fileHeaderContext, + DomainConfig().setPartSizeLimit(0x1000000).setEncoding(encoding), 4); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, name, preExistingDomains); @@ -518,7 +506,7 @@ void Test::createAndFillDomain(const vespalib::string & name, DomainPart::Crc cr void Test::verifyDomain(const vespalib::string & name) { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss("test13", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, name); visitDomainTest(tls, s1.get(), name); @@ -526,8 +514,8 @@ void Test::verifyDomain(const vespalib::string & name) void Test::testCrcVersions() { - createAndFillDomain("ccitt_crc32", DomainPart::ccitt_crc32, 0); - createAndFillDomain("xxh64", DomainPart::xxh64, 1); + createAndFillDomain("ccitt_crc32", Encoding::Crc::ccitt_crc32, 0); + createAndFillDomain("xxh64", Encoding::Crc::xxh64, 1); verifyDomain("ccitt_crc32"); verifyDomain("xxh64"); @@ -536,7 +524,7 @@ void Test::testCrcVersions() bool Test::testRemove() { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); vespalib::string name("test-delete"); @@ -553,7 +541,7 @@ bool Test::testVisitOverPreExistingDomain() { // Depends on Test::testVisitOverGeneratedDomain() DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); vespalib::string name("test1"); @@ -607,7 +595,7 @@ void Test::testMany() const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x80000); + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "many", 0); @@ -630,7 +618,7 @@ void Test::testMany() } { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, "many"); @@ -658,7 +646,7 @@ void Test::testErase() const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x80000); + TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "erase", 0); @@ -667,7 +655,7 @@ void Test::testErase() } { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, "erase"); @@ -757,7 +745,7 @@ Test::testSync() const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test9", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test9", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "sync", 0); @@ -782,7 +770,7 @@ Test::testTruncateOnVersionMismatch() size_t countOld(0); DummyFileHeaderContext fileHeaderContext; { - TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "sync", 0); @@ -803,7 +791,7 @@ Test::testTruncateOnVersionMismatch() EXPECT_EQUAL(static_cast<ssize_t>(sizeof(tmp)), f.Write2(tmp, sizeof(tmp))); EXPECT_TRUE(f.Close()); { - TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, "sync"); uint64_t from(0), to(0); @@ -829,7 +817,7 @@ Test::testTruncateOnShortRead() DummyFileHeaderContext fileHeaderContext; { - TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls(tlsspec); createDomainTest(tls, domain, 0); @@ -845,7 +833,7 @@ Test::testTruncateOnShortRead() EXPECT_EQUAL(2u, countFiles(dir)); } { - TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls(tlsspec); TransLogClient::Session::UP s1 = openDomainTest(tls, domain); checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES); @@ -861,7 +849,7 @@ Test::testTruncateOnShortRead() trfile.Close(); } { - TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls(tlsspec); TransLogClient::Session::UP s1 = openDomainTest(tls, domain); checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES - 1); diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp index abba84b75b6..6f2581d3799 100644 --- a/searchlib/src/tests/transactionlogstress/translogstress.cpp +++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp @@ -8,7 +8,6 @@ #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/fastos/app.h> #include <iostream> -#include <stdexcept> #include <sstream> #include <vespa/log/log.h> @@ -221,7 +220,6 @@ FeederThread::~FeederThread() {} void FeederThread::commitPacket() { - _packet.close(); const vespalib::nbostream& stream = _packet.getHandle(); if (!_session->commit(ConstBufferRef(stream.c_str(), stream.size()))) { throw std::runtime_error(vespalib::make_string @@ -236,8 +234,9 @@ FeederThread::commitPacket() bool FeederThread::addEntry(const Packet::Entry & e) { - //LOG(info, "FeederThread: add %s", EntryPrinter::toStr(e).c_str()); - return _packet.add(e); + if (_packet.sizeBytes() > 0xf000) return false; + _packet.add(e); + return true; } void @@ -699,7 +698,7 @@ TransLogStress::Main() // start transaction log server DummyFileHeaderContext fileHeaderContext; - TransLogServer tls("server", 17897, ".", fileHeaderContext, _cfg.domainPartSize); + TransLogServer tls("server", 17897, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(_cfg.domainPartSize)); TransLogClient client(tlsSpec); client.create(domain); diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def index 74efe3fe68e..b1bdb3765b1 100644 --- a/searchlib/src/vespa/searchlib/config/translogserver.def +++ b/searchlib/src/vespa/searchlib/config/translogserver.def @@ -5,7 +5,7 @@ namespace=searchlib listenport int default=13700 restart ## Max file size (50M) -filesizemax int default=50000000 restart +filesizemax int default=50000000 ## Server name to identify server. servername string default="tls" restart @@ -22,3 +22,17 @@ maxthreads int default=4 restart ##Default crc method used crcmethod enum {ccitt_crc32, xxh64} default=xxh64 + +## Control compression type. +compression.type enum {NONE, LZ4, ZSTD} default=LZ4 + +## Control compression level +## LZ4 has normal range 1..9 while ZSTD has range 1..19 +## 9 is a reasonable default for both +compression.level int default=9 + +## How large a chunk can grow in memory before beeing flushed +chunk.sizelimit int default = 256000 # 256k + +## How long a chunk can reside in memory befor ebeeing flushed to disk. +chunk.agelimit double default = 0.0010 # 10 milliseconds diff --git a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt index d964c88fe29..0755d07b403 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt @@ -1,9 +1,11 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(searchlib_transactionlog OBJECT SOURCES + chunks.cpp common.cpp domain.cpp domainpart.cpp + ichunk.cpp nosyncproxy.cpp session.cpp trans_log_server_explorer.cpp diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp new file mode 100644 index 00000000000..dea0ccf9b9a --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp @@ -0,0 +1,128 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "chunks.h" +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/util/compressor.h> +#include <vespa/vespalib/data/databuffer.h> + +using std::runtime_error; +using std::make_unique; +using vespalib::make_string; +using vespalib::compression::compress; +using vespalib::compression::decompress; +using vespalib::compression::CompressionConfig; +using vespalib::DataBuffer; +using vespalib::ConstBufferRef; +using vespalib::nbostream; + +namespace search::transactionlog { + +namespace { +void verifyCrc(nbostream & is, Encoding::Crc crcType) { + if (is.size() < sizeof(int32_t) * 2) { + throw runtime_error(make_string("Not even room for the crc and length. Only %zu bytes left", is.size())); + } + size_t start = is.rp(); + is.adjustReadPos(is.size() - sizeof(int32_t)); + int32_t crc(0); + is >> crc; + is.rp(start); + int32_t crcVerify = Encoding::calcCrc(crcType, is.c_str(), is.size() - sizeof(crc)); + if (crc != crcVerify) { + throw runtime_error(make_string("Got bad crc : crcVerify = %d, expected %d", + static_cast<int>(crcVerify), static_cast<int>(crc))); + } + is.rp(start); +} + +Encoding::Compression +toCompression(CompressionConfig::Type type) { + switch (type) { + case CompressionConfig::ZSTD: + return Encoding::Compression::zstd; + case CompressionConfig::LZ4: + return Encoding::Compression::lz4; + case CompressionConfig::NONE: + return Encoding::Compression::none; + default: + abort(); + } +} + +} + +Encoding +CCITTCRC32None::onEncode(nbostream &os) const { + size_t start = os.wp(); + assert(getEntries().size() == 1); + serializeEntries(os); + os << int32_t(Encoding::calcCrc(Encoding::Crc::ccitt_crc32, os.c_str()+start, os.size() - start)); + return Encoding(Encoding::Crc::ccitt_crc32, Encoding::Compression::none); +} + +void CCITTCRC32None::onDecode(nbostream &is) { + verifyCrc(is, Encoding::Crc::ccitt_crc32); + nbostream data(is.peek(), is.size() - sizeof(int32_t)); + deserializeEntries(data); + is.adjustReadPos(is.size()); +} + +Encoding +XXH64None::onEncode(nbostream &os) const { + size_t start = os.wp(); + assert(getEntries().size() == 1); + serializeEntries(os); + os << int32_t(Encoding::calcCrc(Encoding::Crc::xxh64, os.c_str()+start, os.size() - start)); + return Encoding(Encoding::Crc::xxh64, Encoding::Compression::none); +} + +void XXH64None::onDecode(nbostream &is) { + verifyCrc(is, Encoding::Crc::xxh64); + nbostream data(is.peek(), is.size() - sizeof(int32_t)); + deserializeEntries(data); + is.adjustReadPos(is.size()); +} + +void +XXH64Compressed::decompress(nbostream & is) { + uint32_t uncompressedLen; + is >> uncompressedLen; + vespalib::DataBuffer uncompressed; + ConstBufferRef compressed(is.peek(), is.size()-sizeof(uint32_t)*2); + ::decompress(_type, uncompressedLen, compressed, uncompressed, false); + nbostream data(uncompressed.getData(), uncompressed.getDataLen()); + deserializeEntries(data); + is.adjustReadPos(is.size()); +} + +XXH64Compressed::XXH64Compressed(CompressionConfig::Type type, uint8_t level) + : _type(type), + _level(level) +{ } + +Encoding +XXH64Compressed::compress(nbostream & os, Encoding::Crc crc) const { + nbostream org; + serializeEntries(org); + DataBuffer compressed; + CompressionConfig cfg(_type, _level, 80); + ConstBufferRef uncompressed(org.c_str(), org.size()); + Encoding::Compression actual = toCompression(::compress(cfg, uncompressed, compressed, false)); + size_t start = os.wp(); + os.write(compressed.getData(), compressed.getDataLen()); + os << int32_t(Encoding::calcCrc(crc, os.c_str()+start, os.size() - start)); + return Encoding(Encoding::Crc::xxh64, actual); +} + +Encoding +XXH64Compressed::onEncode(IChunk::nbostream &os) const { + return compress(os, Encoding::Crc::xxh64); +} + +void +XXH64Compressed::onDecode(IChunk::nbostream &is) { + verifyCrc(is, Encoding::Crc::xxh64); + decompress(is); +} + +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.h b/searchlib/src/vespa/searchlib/transactionlog/chunks.h new file mode 100644 index 00000000000..cf88bc0a3ed --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/chunks.h @@ -0,0 +1,38 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "ichunk.h" +#include <vespa/vespalib/util/compressionconfig.h> + +namespace search::transactionlog { + +class XXH64None : public IChunk { +protected: + Encoding onEncode(nbostream &os) const override; + void onDecode(nbostream &is) override; +public: +}; + +class CCITTCRC32None : public IChunk { +protected: + Encoding onEncode(nbostream &os) const override; + void onDecode(nbostream &is) override; +public: +}; + +class XXH64Compressed : public IChunk { +public: + using CompressionConfig = vespalib::compression::CompressionConfig; + XXH64Compressed(CompressionConfig::Type, uint8_t level); +protected: + void decompress(nbostream & os); + Encoding compress(nbostream & os, Encoding::Crc crc) const; + Encoding onEncode(nbostream &os) const override; + void onDecode(nbostream &is) override; +private: + CompressionConfig::Type _type; + uint8_t _level; +}; + +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.cpp b/searchlib/src/vespa/searchlib/transactionlog/common.cpp index a84e27b2e53..9bf43c8e244 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/common.cpp @@ -1,14 +1,29 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "common.h" +#include <vespa/vespalib/util/stringfmt.h> #include <vespa/fastos/file.h> namespace search::transactionlog { using vespalib::nbostream; using vespalib::nbostream_longlivedbuf; +using vespalib::make_string; +using std::runtime_error; -int makeDirectory(const char * dir) +namespace { + +void throwRangeError(SerialNum prev, SerialNum next) __attribute__((noinline)); + +void throwRangeError(SerialNum prev, SerialNum next) { + if (prev < next) return; + throw runtime_error(make_string("The new serialnum %zu is not higher than the old one %zu", next, prev)); +} + +} + +int +makeDirectory(const char * dir) { int retval(-1); @@ -22,7 +37,8 @@ int makeDirectory(const char * dir) return retval; } -int64_t SerialNumRange::cmp(const SerialNumRange & b) const +int64_t +SerialNumRange::cmp(const SerialNumRange & b) const { int64_t diff(0); if ( ! (contains(b) || b.contains(*this)) ) { @@ -34,7 +50,6 @@ int64_t SerialNumRange::cmp(const SerialNumRange & b) const Packet::Packet(const void * buf, size_t sz) : _count(0), _range(), - _limit(sz), _buf(static_cast<const char *>(buf), sz) { nbostream_longlivedbuf os(_buf.c_str(), sz); @@ -49,18 +64,22 @@ Packet::Packet(const void * buf, size_t sz) : } } -bool Packet::merge(const Packet & packet) +void +Packet::merge(const Packet & packet) { - bool retval(_range.to() < packet._range.from()); - if (retval) { - _count += packet._count; - _range.to(packet._range.to()); - _buf.write(packet.getHandle().c_str(), packet.getHandle().size()); + if (_range.to() >= packet.range().from()) { + throwRangeError(_range.to(), packet.range().from()); } - return retval; + if (_buf.empty()) { + _range.from(packet.range().from()); + } + _count += packet._count; + _range.to(packet._range.to()); + _buf.write(packet.getHandle().c_str(), packet.getHandle().size()); } -nbostream & Packet::Entry::deserialize(nbostream & os) +nbostream & +Packet::Entry::deserialize(nbostream & os) { _valid = false; int32_t len(0); @@ -71,7 +90,8 @@ nbostream & Packet::Entry::deserialize(nbostream & os) return os; } -nbostream & Packet::Entry::serialize(nbostream & os) const +nbostream & +Packet::Entry::serialize(nbostream & os) const { os << _unique << _type << static_cast<uint32_t>(_data.size()); os.write(_data.c_str(), _data.size()); @@ -83,22 +103,21 @@ Packet::Entry::Entry(SerialNum u, Type t, const vespalib::ConstBufferRef & d) : _type(t), _valid(true), _data(d) -{ -} - +{ } -bool Packet::add(const Packet::Entry & e) +void +Packet::add(const Packet::Entry & e) { - bool retval((_buf.size() < _limit) && (_range.to() < e.serial())); - if (retval) { - if (_buf.empty()) { - _range.from(e.serial()); - } - e.serialize(_buf); - _count++; - _range.to(e.serial()); + if (_range.to() >= e.serial()) { + throwRangeError(_range.to(), e.serial()); } - return retval; + + if (_buf.empty()) { + _range.from(e.serial()); + } + e.serialize(_buf); + _count++; + _range.to(e.serial()); } } diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h index db8b9727daa..0deceb2668a 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.h +++ b/searchlib/src/vespa/searchlib/transactionlog/common.h @@ -69,21 +69,19 @@ public: vespalib::ConstBufferRef _data; }; public: - Packet(size_t m=0xf000) : _count(0), _range(), _limit(m), _buf(m) { } + Packet(size_t reserved) : _count(0), _range(), _buf(reserved) { } Packet(const void * buf, size_t sz); - bool add(const Entry & data); - void close() { } + void add(const Entry & data); void clear() { _buf.clear(); _count = 0; _range.from(0); _range.to(0); } const SerialNumRange & range() const { return _range; } const vespalib::nbostream & getHandle() const { return _buf; } size_t size() const { return _count; } bool empty() const { return _count == 0; } size_t sizeBytes() const { return _buf.size(); } - bool merge(const Packet & packet); + void merge(const Packet & packet); private: size_t _count; SerialNumRange _range; - size_t _limit; vespalib::nbostream_longlivedbuf _buf; }; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 88c2dd9ecc3..c03543e5599 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -20,28 +20,41 @@ using vespalib::MonitorGuard; using search::common::FileHeaderContext; using std::runtime_error; using namespace std::chrono_literals; +using namespace std::chrono; +using std::make_shared; namespace search::transactionlog { -Domain::Domain(const string &domainName, const string & baseDir, Executor & commitExecutor, - Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType, - const FileHeaderContext &fileHeaderContext) : - _defaultCrcType(defaultCrcType), - _commitExecutor(commitExecutor), - _sessionExecutor(sessionExecutor), - _sessionId(1), - _syncMonitor(), - _pendingSync(false), - _name(domainName), - _domainPartSize(domainPartSize), - _parts(), - _lock(), - _sessionLock(), - _sessions(), - _maxSessionRunTime(), - _baseDir(baseDir), - _fileHeaderContext(fileHeaderContext), - _markedDeleted(false) +DomainConfig::DomainConfig() + : _encoding(Encoding::Crc::xxh64, Encoding::Compression::none), + _compressionLevel(9), + _partSizeLimit(0x10000000), // 256M + _chunkSizeLimit(0x40000), // 256k + _chunkAgeLimit(1ms) +{ } +Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPool & threadPool, + Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, + const FileHeaderContext &fileHeaderContext) + : _config(cfg), + _currentChunk(std::make_unique<Chunk>()), + _lastSerial(0), + _threadPool(threadPool), + _commitExecutor(commitExecutor), + _sessionExecutor(sessionExecutor), + _sessionId(1), + _syncMonitor(), + _pendingSync(false), + _name(domainName), + _parts(), + _lock(), + _currentChunkMonitor(), + _sessionLock(), + _sessions(), + _maxSessionRunTime(), + _baseDir(baseDir), + _fileHeaderContext(fileHeaderContext), + _markedDeleted(false), + _self(nullptr) { int retval(0); if ((retval = makeDirectory(_baseDir.c_str())) != 0) { @@ -59,12 +72,32 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & comm } _sessionExecutor.sync(); if (_parts.empty() || _parts.crbegin()->second->isClosed()) { - _parts[lastPart].reset(new DomainPart(_name, dir(), lastPart, _defaultCrcType, _fileHeaderContext, false)); + _parts[lastPart] = make_shared<DomainPart>(_name, dir(), lastPart, _config.getEncoding(), + _config.getCompressionlevel(), _fileHeaderContext, false); } + _lastSerial = end(); + _self = _threadPool.NewThread(this); + assert(_self); } +Domain & +Domain::setConfig(const DomainConfig & cfg) { + _config = cfg; + return *this; +} + +void +Domain::Run(FastOS_ThreadInterface *thisThread, void *) { + + while (!thisThread->GetBreakFlag()) { + vespalib::MonitorGuard guard(_currentChunkMonitor); + guard.wait(duration_cast<milliseconds>(_config.getChunkAgeLimit()).count()); + commitIfStale(guard); + } +} void Domain::addPart(int64_t partId, bool isLastPart) { - DomainPart::SP dp(new DomainPart(_name, dir(), partId, _defaultCrcType, _fileHeaderContext, isLastPart)); + auto dp = make_shared<DomainPart>(_name, dir(), partId, _config.getEncoding(), + _config.getCompressionlevel(), _fileHeaderContext, isLastPart); if (dp->size() == 0) { // Only last domain part is allowed to be truncated down to // empty size. @@ -102,7 +135,16 @@ private: bool & _pendingSync; }; -Domain::~Domain() { } +Domain::~Domain() { + if (_self) { + _self->SetBreakFlag(); + { + MonitorGuard guard(_currentChunkMonitor); + guard.broadcast(); + } + _self->Join(); + } +} DomainInfo Domain::getDomainInfo() const @@ -129,7 +171,7 @@ Domain::begin(const LockGuard & guard) const assert(guard.locks(_lock)); SerialNum s(0); if ( ! _parts.empty() ) { - s = _parts.begin()->second->range().from(); + s = _parts.cbegin()->second->range().from(); } return s; } @@ -147,7 +189,7 @@ Domain::end(const LockGuard & guard) const assert(guard.locks(_lock)); SerialNum s(0); if ( ! _parts.empty() ) { - s = _parts.rbegin()->second->range().to(); + s = _parts.crbegin()->second->range().to(); } return s; } @@ -267,7 +309,8 @@ void Domain::cleanSessions() namespace { -void waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync) +void +waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync) { MonitorGuard guard(syncMonitor); while (pendingSync) { @@ -277,18 +320,82 @@ void waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync) } -void Domain::commit(const Packet & packet) + +Domain::Chunk::Chunk() + : _data(size_t(-1)), + _callBacks(), + _firstArrivalTime() +{} + +Domain::Chunk::~Chunk() = default; +void +Domain::Chunk::add(const Packet &packet, Writer::DoneCallback onDone) { + if (_callBacks.empty()) { + _firstArrivalTime = steady_clock::now(); + } + _data.merge(packet); + _callBacks.emplace_back(std::move(onDone)); +} + +microseconds +Domain::Chunk::age() const { + if (_callBacks.empty()) { + return 0ms; + } + return duration_cast<microseconds>(steady_clock::now() - _firstArrivalTime); +} + +void +Domain::commit(const Packet & packet, Writer::DoneCallback onDone) { + std::unique_ptr<Chunk> completed; + vespalib::MonitorGuard guard(_currentChunkMonitor); + if (! (_lastSerial < packet.range().from())) { + throw runtime_error(make_string("Incomming serial number(%ld) must be bigger than the last one (%ld).", + packet.range().from(), _lastSerial)); + } else { + _lastSerial = packet.range().to(); + } + _currentChunk->add(packet, std::move(onDone)); + if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) { + completed = grabCurrentChunk(guard); + } + if (completed) { + commitChunk(std::move(_currentChunk), guard); + } +} + +std::unique_ptr<Domain::Chunk> +Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) { + assert(guard.monitors(_currentChunkMonitor)); + auto chunk = std::move(_currentChunk); + _currentChunk = std::make_unique<Chunk>(); + return chunk; +} + +void +Domain::commitIfStale(const vespalib::MonitorGuard & guard) { + assert(guard.monitors(_currentChunkMonitor)); + if (_currentChunk->age() > _config.getChunkAgeLimit()) { + commitChunk(grabCurrentChunk(guard), guard); + } +} + +void +Domain::commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) { + assert(chunkOrderGuard.monitors(_currentChunkMonitor)); + const Packet & packet = chunk->getPacket(); DomainPart::SP dp(_parts.rbegin()->second); vespalib::nbostream_longlivedbuf is(packet.getHandle().c_str(), packet.getHandle().size()); Packet::Entry entry; entry.deserialize(is); - if (dp->byteSize() > _domainPartSize) { + if (dp->byteSize() > _config.getPartSizeLimit()) { waitPendingSync(_syncMonitor, _pendingSync); triggerSyncNow(); waitPendingSync(_syncMonitor, _pendingSync); dp->close(); - dp.reset(new DomainPart(_name, dir(), entry.serial(), _defaultCrcType, _fileHeaderContext, false)); + dp = make_shared<DomainPart>(_name, dir(), entry.serial(), _config.getEncoding(), + _config.getCompressionlevel(), _fileHeaderContext, false); { LockGuard guard(_lock); _parts[entry.serial()] = dp; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index c1ff9157a6f..553117959c7 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -8,16 +8,39 @@ namespace search::transactionlog { +class DomainConfig { +public: + using microseconds = std::chrono::microseconds; + DomainConfig(); + DomainConfig & setEncoding(Encoding v) { _encoding = v; return *this; } + DomainConfig & setPartSizeLimit(size_t v) { _partSizeLimit = v; return *this; } + DomainConfig & setChunkSizeLimit(size_t v) { _chunkSizeLimit = v; return *this; } + DomainConfig & setChunkAgeLimit(microseconds v) { _chunkAgeLimit = v; return *this; } + DomainConfig & setCompressionLevel(uint8_t v) { _compressionLevel = v; return *this; } + Encoding getEncoding() const { return _encoding; } + size_t getPartSizeLimit() const { return _partSizeLimit; } + size_t getChunkSizeLimit() const { return _chunkSizeLimit; } + microseconds getChunkAgeLimit() const { return _chunkAgeLimit; } + uint8_t getCompressionlevel() const { return _compressionLevel; } +private: + Encoding _encoding; + uint8_t _compressionLevel; + size_t _partSizeLimit; + size_t _chunkSizeLimit; + microseconds _chunkAgeLimit; +}; + struct PartInfo { SerialNumRange range; size_t numEntries; size_t byteSize; vespalib::string file; - PartInfo(SerialNumRange range_in, size_t numEntries_in, - size_t byteSize_in, - vespalib::stringref file_in) - : range(range_in), numEntries(numEntries_in), byteSize(byteSize_in), - file(file_in) {} + PartInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, vespalib::stringref file_in) + : range(range_in), + numEntries(numEntries_in), + byteSize(byteSize_in), + file(file_in) + {} }; struct DomainInfo { @@ -35,22 +58,22 @@ struct DomainInfo { typedef std::map<vespalib::string, DomainInfo> DomainStats; -class Domain +class Domain final : public FastOS_Runnable { public: using SP = std::shared_ptr<Domain>; using Executor = vespalib::ThreadExecutor; - Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & commitExecutor, - Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType, + Domain(const vespalib::string &name, const vespalib::string &baseDir, FastOS_ThreadPool & threadPool, + Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, const common::FileHeaderContext &fileHeaderContext); - virtual ~Domain(); + ~Domain() override; DomainInfo getDomainInfo() const; const vespalib::string & name() const { return _name; } bool erase(SerialNum to); - void commit(const Packet & packet); + void commit(const Packet & packet, Writer::DoneCallback onDone); int visit(const Domain::SP & self, SerialNum from, SerialNum to, FRT_Supervisor & supervisor, FNET_Connection *conn); SerialNum begin() const; @@ -77,8 +100,26 @@ public: return _sessionExecutor.execute(std::move(task)); } uint64_t size() const; - + Domain & setConfig(const DomainConfig & cfg); private: + void Run(FastOS_ThreadInterface *thisThread, void *arguments) override; + void commitIfStale(const vespalib::MonitorGuard & guard); + class Chunk { + public: + Chunk(); + ~Chunk(); + void add(const Packet & packet, Writer::DoneCallback onDone); + size_t sizeBytes() const { return _data.sizeBytes(); } + const Packet & getPacket() const { return _data; } + std::chrono::microseconds age() const; + private: + Packet _data; + std::vector<Writer::DoneCallback> _callBacks; + std::chrono::steady_clock::time_point _firstArrivalTime; + }; + + std::unique_ptr<Chunk> grabCurrentChunk(const vespalib::MonitorGuard & guard); + void commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard); SerialNum begin(const vespalib::LockGuard & guard) const; SerialNum end(const vespalib::LockGuard & guard) const; size_t byteSize(const vespalib::LockGuard & guard) const; @@ -95,22 +136,26 @@ private: using DomainPartList = std::map<int64_t, DomainPart::SP>; using DurationSeconds = std::chrono::duration<double>; - DomainPart::Crc _defaultCrcType; - Executor & _commitExecutor; - Executor & _sessionExecutor; - std::atomic<int> _sessionId; - vespalib::Monitor _syncMonitor; - bool _pendingSync; - vespalib::string _name; - uint64_t _domainPartSize; - DomainPartList _parts; - vespalib::Lock _lock; - vespalib::Lock _sessionLock; - SessionList _sessions; - DurationSeconds _maxSessionRunTime; - vespalib::string _baseDir; + DomainConfig _config; + std::unique_ptr<Chunk> _currentChunk; + SerialNum _lastSerial; + FastOS_ThreadPool & _threadPool; + Executor & _commitExecutor; + Executor & _sessionExecutor; + std::atomic<int> _sessionId; + vespalib::Monitor _syncMonitor; + bool _pendingSync; + vespalib::string _name; + DomainPartList _parts; + vespalib::Lock _lock; + vespalib::Monitor _currentChunkMonitor; + vespalib::Lock _sessionLock; + SessionList _sessions; + DurationSeconds _maxSessionRunTime; + vespalib::string _baseDir; const common::FileHeaderContext &_fileHeaderContext; - bool _markedDeleted; + bool _markedDeleted; + FastOS_ThreadInterface * _self; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp index 35bdc71c963..f140fb93b96 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp @@ -2,7 +2,6 @@ #include "domainpart.h" #include <vespa/vespalib/util/crc.h> -#include <vespa/vespalib/xxhash/xxhash.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/data/fileheader.h> #include <vespa/searchlib/common/fileheadercontext.h> @@ -27,37 +26,26 @@ namespace search::transactionlog { namespace { -void -handleSync(FastOS_FileInterface &file) __attribute__ ((noinline)); +constexpr size_t TARGET_PACKET_SIZE = 0x3f000; string -handleWriteError(const char *text, - FastOS_FileInterface &file, - int64_t lastKnownGoodPos, - const Packet::Entry &entry, - int bufLen) __attribute__ ((noinline)); +handleWriteError(const char *text, FastOS_FileInterface &file, int64_t lastKnownGoodPos, + SerialNumRange range, int bufLen) __attribute__ ((noinline)); bool -handleReadError(const char *text, - FastOS_FileInterface &file, - ssize_t len, - ssize_t rlen, - int64_t lastKnownGoodPos, - bool allowTruncate) __attribute__ ((noinline)); +handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize_t rlen, + int64_t lastKnownGoodPos, bool allowTruncate) __attribute__ ((noinline)); -bool -addPacket(Packet &packet, - const Packet::Entry &e) __attribute__ ((noinline)); +void handleSync(FastOS_FileInterface &file) __attribute__ ((noinline)); +void addPacket(Packet &packet, const Packet::Entry &e) __attribute__ ((noinline)); +bool tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) __attribute__ ((noinline)); -bool -tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) __attribute__ ((noinline)); - -bool +void addPacket(Packet &packet, const Packet::Entry &e) { LOG(spam, "Adding serial #%" PRIu64 ", of type %d and size %zd into packet of size %zu and %zu bytes", e.serial(), e.type(), e.data().size(), packet.size(), packet.sizeBytes()); - return ! packet.add(e); + packet.add(e); } void @@ -72,21 +60,18 @@ handleSync(FastOS_FileInterface &file) } string -handleWriteError(const char *text, - FastOS_FileInterface &file, - int64_t lastKnownGoodPos, - const Packet::Entry &entry, - int bufLen) +handleWriteError(const char *text, FastOS_FileInterface &file, int64_t lastKnownGoodPos, + SerialNumRange range, int bufLen) { string last(FastOS_File::getLastErrorString()); - string e(make_string("%s. File '%s' at position %" PRId64 " for entry %" PRIu64 " of length %u. " - "OS says '%s'. Rewind to last known good position %" PRId64 ".", - text, file.GetFileName(), file.GetPosition(), entry.serial(), bufLen, + string e(make_string("%s. File '%s' at position %" PRId64 " for entries [%zu, %zu] of length %u. " + "OS says '%s'. Rewind to last known good position %zu.", + text, file.GetFileName(), file.GetPosition(), range.from(), range.to(), bufLen, last.c_str(), lastKnownGoodPos)); LOG(error, "%s", e.c_str()); if ( ! file.SetPosition(lastKnownGoodPos) ) { last = FastOS_File::getLastErrorString(); - throw runtime_error(make_string("Failed setting position %" PRId64 " of file '%s' of size %" PRId64 ": OS says '%s'", + throw runtime_error(make_string("Failed setting position %zu of file '%s' of size %zd : OS says '%s'", lastKnownGoodPos, file.GetFileName(), file.GetSize(), last.c_str())); } handleSync(file); @@ -118,12 +103,8 @@ tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) } bool -handleReadError(const char *text, - FastOS_FileInterface &file, - ssize_t len, - ssize_t rlen, - int64_t lastKnownGoodPos, - bool allowTruncate) +handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize_t rlen, + int64_t lastKnownGoodPos, bool allowTruncate) { bool retval(true); if (rlen != -1) { @@ -178,6 +159,43 @@ handleReadError(const char *text, } +Packet +DomainPart::readPacket(FastOS_FileInterface & transLog, SerialNumRange wanted, size_t targetSize, bool allowTruncate) { + Alloc buf; + Packet packet(targetSize); + int64_t fSize(transLog.GetSize()); + int64_t currPos(transLog.GetPosition()); + for(size_t i(0); (packet.sizeBytes() < targetSize) && (currPos < fSize) && (packet.range().to() < wanted.to()); i++) { + IChunk::UP chunk; + if (read(transLog, chunk, buf, allowTruncate)) { + if (chunk) { + try { + for (const Packet::Entry & e : chunk->getEntries()) { + if ((wanted.from() < e.serial()) && (e.serial() <= wanted.to())) { + addPacket(packet, e); + } + } + } catch (const std::exception & ex) { + throw runtime_error(make_string("%s : Failed creating packet for list %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", + ex.what(), transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); + } + } else { + throw runtime_error(make_string("Invalid entry reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", + transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); + } + } else { + if (transLog.GetSize() != fSize) { + fSize = transLog.GetSize(); + } else { + throw runtime_error(make_string("Failed reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", + transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); + } + } + currPos = transLog.GetPosition(); + } + return packet; +} + int64_t DomainPart::buildPacketMapping(bool allowTruncate) { @@ -208,66 +226,35 @@ DomainPart::buildPacketMapping(bool allowTruncate) handleReadError("file header", transLog, 0, FileHeader::getMinSize(), 0, allowTruncate); } } + const SerialNumRange all(0, std::numeric_limits<SerialNum>::max()); while ((currPos < fSize)) { - Packet packet; - SerialNum firstSerial(0); - SerialNum lastSerial(0); - int64_t firstPos(currPos); - bool full(false); - Alloc buf; - for(size_t i(0); !full && (currPos < fSize); i++) { - Packet::Entry e; - if (read(transLog, e, buf, allowTruncate)) { - if (e.valid()) { - if (i == 0) { - firstSerial = e.serial(); - if (currPos == _headerLen) { - _range.from(firstSerial); - } - } - try { - full = addPacket(packet, e); - if ( ! full ) { - lastSerial = e.serial(); - currPos = transLog.GetPosition(); - _sz++; - } else { - transLog.SetPosition(currPos); - } - } catch (const std::exception & ex) { - throw runtime_error(make_string("%s : Failed creating packet for list %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", - ex.what(), transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); - } - } else { - throw runtime_error(make_string("Invalid entry reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", - transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); - } - } else { - if (transLog.GetSize() != fSize) { - fSize = transLog.GetSize(); - } else { - throw runtime_error(make_string("Failed reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", - transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); - } - } - } - packet.close(); + const int64_t firstPos(currPos); + Packet packet = readPacket(transLog, all, TARGET_PACKET_SIZE, allowTruncate); if (!packet.empty()) { - _packets[firstSerial] = packet; - _range.to(lastSerial); + _sz += packet.size(); + const SerialNum firstSerial = packet.range().from(); + if (currPos == _headerLen) { + _range.from(firstSerial); + } + _range.to(packet.range().to()); + _packets.insert(std::make_pair(firstSerial, std::move(packet))); { LockGuard guard(_lock); _skipList.push_back(SkipInfo(firstSerial, firstPos)); } + } else { + fSize = transLog.GetSize(); } + currPos = transLog.GetPosition(); } transLog.Close(); return currPos; } -DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Crc defaultCrc, - const FileHeaderContext &fileHeaderContext, bool allowTruncate) : - _defaultCrc(defaultCrc), +DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding, + uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate) : + _encoding(encoding), + _compressionLevel(compressionLevel), _lock(), _fileLock(), _range(s), @@ -411,10 +398,12 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) //LOG(spam, //"Pos(%d) Len(%d), Lim(%d), Remaining(%d)", //h.getPos(), h.getLength(), h.getLimit(), h.getRemaining()); + IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel); Packet::Entry entry; entry.deserialize(h); if (_range.to() < entry.serial()) { - write(*_transLog, entry); + chunk->add(entry); + write(*_transLog, *chunk); _sz++; _range.to(entry.serial()); } else { @@ -428,17 +417,15 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) if ( ! _packets.empty() ) { Packet & lastPacket = _packets.rbegin()->second; if (lastPacket.sizeBytes() < 0xf000) { - if ( ! (merged = lastPacket.merge(packet)) ) { - LOG(error, "Failed merging packet [%" PRIu64 ", %" PRIu64 "] with [%" PRIu64 ", %" PRIu64 "]", - lastPacket.range().from(), lastPacket.range().to(), - packet.range().from(), packet.range().to()); - } + lastPacket.merge(packet); + merged = true; } } if (! merged ) { - _packets[firstSerial] = packet; + _packets.insert(std::make_pair(firstSerial, std::move(packet))); _skipList.push_back(SkipInfo(firstSerial, firstPos)); } + sync(); } void DomainPart::sync() @@ -506,23 +493,17 @@ DomainPart::visit(SerialNumRange &r, Packet &packet) if (e.serial() <= r.to()) { LOG(spam, "Adding serial #%" PRIu64 ", of type %d and size %zd into packet of size %zu and %zu bytes", e.serial(), e.type(), e.data().size(), newPacket.size(), newPacket.sizeBytes()); - if (newPacket.add(e)) { - r.from(e.serial()); - } else { - throw runtime_error("Could not add entry to packet. Here is some mumbo jumbo. Fix."); - } + newPacket.add(e); + r.from(e.serial()); } else { // Force breakout on visiting empty interval. r.from(r.to()); } } } - newPacket.close(); - packet = newPacket; + packet = std::move(newPacket); retval = next != _packets.end(); } - } else { - packet.close(); } } else { /// File has been closed must continue from file. @@ -539,131 +520,86 @@ DomainPart::visit(FastOS_FileInterface &file, SerialNumRange &r, Packet &packet) if ( ! file.IsOpened() ) { retval = openAndFind(file, r.from() + 1); } - if (retval) { - Packet newPacket; - Alloc buf; - for (bool full(false);!full && retval && (r.from() < r.to());) { - Packet::Entry e; - int64_t fPos = file.GetPosition(); - retval = read(file, e, buf, false); - if (retval && - e.valid() && - (r.from() < e.serial()) && - (e.serial() <= r.to())) { - try { - full = addPacket(newPacket, e); - } catch (const std::exception & ex) { - throw runtime_error(make_string("%s : Failed creating packet for visit %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", - ex.what(), file.GetFileName(), file.GetSize(), fPos, file.GetPosition())); - } - if ( !full ) { - r.from(e.serial()); - } else { - if ( ! file.SetPosition(fPos) ) { - throw runtime_error(make_string("Failed setting read position for file '%s' of size %" PRId64 " from %" PRId64 " to %" PRId64 ".", - file.GetFileName(), file.GetSize(), file.GetPosition(), fPos)); - } - } - } - } - newPacket.close(); - packet = newPacket; + if ( ! retval) { + return false; } - return retval; + packet = readPacket(file, r, TARGET_PACKET_SIZE, false); + if (!packet.empty()) { + r.from(packet.range().to()); + } + + return !packet.empty(); } void -DomainPart::write(FastOS_FileInterface &file, const Packet::Entry &entry) +DomainPart::write(FastOS_FileInterface &file, const IChunk & chunk) { - int64_t lastKnownGoodPos(file.GetPosition()); - int32_t crc(0); - uint32_t len(entry.serializedSize() + sizeof(crc)); nbostream os; - os << static_cast<uint8_t>(_defaultCrc); - os << len; - size_t start(os.size()); - entry.serialize(os); - size_t end(os.size()); - crc = calcCrc(_defaultCrc, os.c_str()+start, end - start); - os << crc; - size_t osSize = os.size(); - assert(osSize == len + sizeof(len) + sizeof(uint8_t)); + size_t begin = os.wp(); + os << _encoding.getRaw(); + os << uint32_t(0); + Encoding realEncoding = chunk.encode(os); + size_t end = os.wp(); + os.wp(0); + os << realEncoding.getRaw(); + os << uint32_t(end - (begin + sizeof(uint32_t) + sizeof(uint8_t))); + os.wp(end); + int64_t lastKnownGoodPos(file.GetPosition()); LockGuard guard(_writeLock); - if ( ! file.CheckedWrite(os.c_str(), osSize) ) { - throw runtime_error(handleWriteError("Failed writing the entry.", file, lastKnownGoodPos, entry, end - start)); + if ( ! file.CheckedWrite(os.c_str(), os.size()) ) { + throw runtime_error(handleWriteError("Failed writing the entry.", file, lastKnownGoodPos, chunk.range(), os.size())); } - _writtenSerial = entry.serial(); - _byteSize.store(lastKnownGoodPos + osSize, std::memory_order_release); + _writtenSerial = chunk.range().to(); + _byteSize.store(lastKnownGoodPos + os.size(), std::memory_order_release); } bool -DomainPart::read(FastOS_FileInterface &file, - Packet::Entry &entry, - Alloc & buf, - bool allowTruncate) +DomainPart::read(FastOS_FileInterface &file, IChunk::UP & chunk, Alloc & buf, bool allowTruncate) { - bool retval(true); char tmp[5]; int64_t lastKnownGoodPos(file.GetPosition()); size_t rlen = file.Read(tmp, sizeof(tmp)); nbostream his(tmp, sizeof(tmp)); - uint8_t version(-1); + uint8_t encoding(-1); uint32_t len(0); - his >> version >> len; - if ((retval = (rlen == sizeof(tmp)))) { - if ( ! (retval = (version == ccitt_crc32) || version == xxh64)) { - string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2'," - " got %d from '%s' at position %ld", - version, file.GetFileName(), lastKnownGoodPos)); - if ((version == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) { - LOG(warning, "%s", msg.c_str()); - return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate); - } else { - throw runtime_error(msg); - } - } - if (len > buf.size()) { - Alloc::alloc(len).swap(buf); - } - rlen = file.Read(buf.get(), len); - retval = rlen == len; - if (!retval) { - retval = handleReadError("packet blob", file, len, rlen, lastKnownGoodPos, allowTruncate); - } else { - nbostream_longlivedbuf is(buf.get(), len); - entry.deserialize(is); - int32_t crc(0); - is >> crc; - int32_t crcVerify(calcCrc(static_cast<Crc>(version), buf.get(), len - sizeof(crc))); - if (crc != crcVerify) { - throw runtime_error(make_string("Got bad crc for packet from '%s' (len pos=%" PRId64 ", len=%d) : crcVerify = %d, expected %d", - file.GetFileName(), file.GetPosition() - len - sizeof(len), - static_cast<int>(len), static_cast<int>(crcVerify), static_cast<int>(crc))); - } - } - } else { - if (rlen == 0) { - // Eof + his >> encoding >> len; + if (rlen != sizeof(tmp)) { + return (rlen == 0) + ? true + : handleReadError("packet length", file, sizeof(len), rlen, lastKnownGoodPos, allowTruncate); + } + + try { + chunk = IChunk::create(encoding); + } catch (const std::exception & e) { + string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2'," + " got %d from '%s' at position %ld", + encoding, file.GetFileName(), lastKnownGoodPos)); + if ((encoding == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) { + LOG(warning, "%s", msg.c_str()); + return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate); } else { - retval = handleReadError("packet length", file, sizeof(len), rlen, lastKnownGoodPos, allowTruncate); + throw runtime_error(msg); } } - return retval; -} - -int32_t DomainPart::calcCrc(Crc version, const void * buf, size_t sz) -{ - if (version == xxh64) { - return static_cast<int32_t>(XXH64(buf, sz, 0ll)); - } else if (version == ccitt_crc32) { - vespalib::crc_32_type calculator; - calculator.process_bytes(buf, sz); - return calculator.checksum(); - } else { - abort(); + if (len > buf.size()) { + Alloc::alloc(len).swap(buf); } + rlen = file.Read(buf.get(), len); + if (rlen != len) { + return handleReadError("packet blob", file, len, rlen, lastKnownGoodPos, allowTruncate); + } + try { + nbostream_longlivedbuf is(buf.get(), len); + chunk->decode(is); + } catch (const std::exception & e) { + throw runtime_error(make_string("Got exception during decoding of packet '%s' from file '%s' (len pos=%" PRId64 ", len=%d)", + e.what(), file.GetFileName(), file.GetPosition() - len - sizeof(len), static_cast<int>(len))); + } + + return true; } } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h index 59d0df6df94..5256b731125 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h @@ -2,6 +2,7 @@ #pragma once #include "common.h" +#include "ichunk.h" #include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/memory.h> #include <map> @@ -19,13 +20,9 @@ private: DomainPart& operator=(const DomainPart &); public: - enum Crc { - ccitt_crc32=1, - xxh64=2 - }; typedef std::shared_ptr<DomainPart> SP; - DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Crc defaultCrc, - const common::FileHeaderContext &FileHeaderContext, bool allowTruncate); + DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Encoding defaultEncoding, + uint8_t compressionLevel, const common::FileHeaderContext &FileHeaderContext, bool allowTruncate); ~DomainPart(); @@ -49,13 +46,13 @@ public: } bool isClosed() const; private: + using Alloc = vespalib::alloc::Alloc; bool openAndFind(FastOS_FileInterface &file, const SerialNum &from); int64_t buildPacketMapping(bool allowTruncate); + static Packet readPacket(FastOS_FileInterface & file, SerialNumRange wanted, size_t targetSize, bool allowTruncate); + static bool read(FastOS_FileInterface &file, IChunk::UP & chunk, Alloc &buf, bool allowTruncate); - static bool read(FastOS_FileInterface &file, Packet::Entry &entry, vespalib::alloc::Alloc &buf, bool allowTruncate); - - void write(FastOS_FileInterface &file, const Packet::Entry &entry); - static int32_t calcCrc(Crc crc, const void * buf, size_t len); + void write(FastOS_FileInterface &file, const IChunk & entry); void writeHeader(const common::FileHeaderContext &fileHeaderContext); class SkipInfo @@ -77,21 +74,22 @@ private: }; typedef std::vector<SkipInfo> SkipList; typedef std::map<SerialNum, Packet> PacketList; - const Crc _defaultCrc; - vespalib::Lock _lock; - vespalib::Lock _fileLock; - SerialNumRange _range; - size_t _sz; + const Encoding _encoding; + const uint8_t _compressionLevel; + vespalib::Lock _lock; + vespalib::Lock _fileLock; + SerialNumRange _range; + size_t _sz; std::atomic<uint64_t> _byteSize; - PacketList _packets; - vespalib::string _fileName; + PacketList _packets; + vespalib::string _fileName; std::unique_ptr<FastOS_FileInterface> _transLog; - SkipList _skipList; - uint32_t _headerLen; - vespalib::Lock _writeLock; + SkipList _skipList; + uint32_t _headerLen; + vespalib::Lock _writeLock; // Protected by _writeLock - SerialNum _writtenSerial; - SerialNum _syncedSerial; + SerialNum _writtenSerial; + SerialNum _syncedSerial; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp new file mode 100644 index 00000000000..af7d396c0e4 --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp @@ -0,0 +1,108 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "chunks.h" +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/util/crc.h> +#include <vespa/vespalib/xxhash/xxhash.h> +#include <cassert> + +using std::runtime_error; +using std::make_unique; +using vespalib::make_string; +using vespalib::nbostream_longlivedbuf; +using vespalib::compression::CompressionConfig; + +namespace search::transactionlog { + +Encoding::Encoding(Crc crc, Compression compression) + : _raw(crc | (compression >> 2)) +{ + assert(crc <= Crc::xxh64); + assert(compression <= Compression::lz4); +} + +IChunk::~IChunk() = default; + +void +IChunk::add(const Packet::Entry & entry) { + _entries.emplace_back(entry); +} + +SerialNumRange +IChunk::range() const { + return _entries.empty() + ? SerialNumRange() + : SerialNumRange(_entries.front().serial(), _entries.back().serial()); +} + +void +IChunk::deserializeEntries(nbostream & is) { + while (is.good() && !is.empty()) { + Packet::Entry e; + e.deserialize(is); + add(e); + } +} + +void +IChunk::serializeEntries(nbostream & os) const { + for (const auto & e : _entries) { + e.serialize(os); + } +} + +Encoding +IChunk::encode(nbostream & os) const { + return onEncode(os); +} + +void +IChunk::decode(nbostream & is) { + onDecode(is); +} + +IChunk::UP +IChunk::create(uint8_t chunkType) { + return create(Encoding(chunkType), 9); +} +IChunk::UP +IChunk::create(Encoding encoding, uint8_t compressionLevel) { + switch (encoding.getCrc()) { + case Encoding::Crc::xxh64: + switch (encoding.getCompression()) { + case Encoding::Compression::none: + return make_unique<XXH64None>(); + case Encoding::Compression::lz4: + return make_unique<XXH64Compressed>(CompressionConfig::LZ4, compressionLevel); + case Encoding::Compression::zstd: + return make_unique<XXH64Compressed>(CompressionConfig::ZSTD, compressionLevel); + default: + return make_unique<XXH64Compressed>(CompressionConfig::LZ4, compressionLevel); + } + case Encoding::Crc::ccitt_crc32: + switch (encoding.getCompression()) { + case Encoding::Compression::none: + return make_unique<CCITTCRC32None>(); + default: + throw runtime_error(make_string("Unhandled compression type '%d' for ccitt_crc32 compression", + encoding.getCompression())); + } + default: + throw runtime_error(make_string("Unhandled crc type '%d'", encoding.getCrc())); + } +} + +int32_t Encoding::calcCrc(Crc version, const void * buf, size_t sz) +{ + if (version == xxh64) { + return static_cast<int32_t>(XXH64(buf, sz, 0ll)); + } else if (version == ccitt_crc32) { + vespalib::crc_32_type calculator; + calculator.process_bytes(buf, sz); + return calculator.checksum(); + } else { + abort(); + } +} + +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h new file mode 100644 index 00000000000..0e913703468 --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h @@ -0,0 +1,57 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "common.h" +#include <memory> + +namespace vespalib { class nbostream; } + +namespace search::transactionlog { + +class Encoding { +public: + enum Crc { + nocrc = 0, + ccitt_crc32 = 1, + xxh64 = 2 + }; + enum Compression { + none = 0, + lz4 = 1, + zstd = 2 + }; + Encoding(uint8_t raw) : _raw(raw) {} + Encoding(Crc crc, Compression compression); + Crc getCrc() const { return Crc(_raw & 0x3); } + Compression getCompression() const { return Compression((_raw >> 2) & 0xf); } + static int32_t calcCrc(Crc version, const void * buf, size_t sz); + uint8_t getRaw() const { return _raw; } +private: + uint8_t _raw; +}; + +class IChunk { +public: + using UP = std::unique_ptr<IChunk>; + using Entries = std::vector<Packet::Entry>; + using nbostream = vespalib::nbostream; + using ConstBufferRef = vespalib::ConstBufferRef; + virtual ~IChunk(); + const Entries & getEntries() const { return _entries; } + void add(const Packet::Entry & entry); + Encoding encode(nbostream & os) const; + void decode(nbostream & buf); + static UP create(uint8_t chunkType); + static UP create(Encoding chunkType, uint8_t compressionLevel); + SerialNumRange range() const; +protected: + virtual Encoding onEncode(nbostream & os) const = 0; + virtual void onDecode(nbostream & is) = 0; + void deserializeEntries(nbostream & is); + void serializeEntries(nbostream & os) const; +private: + Entries _entries; +}; + +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp index cbcbc68fdff..56aaa162485 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp @@ -36,7 +36,7 @@ Session::VisitTask::run() bool Session::visit(FastOS_FileInterface & file, DomainPart & dp) { - Packet packet; + Packet packet(size_t(-1)); bool more(false); if (dp.isClosed()) { more = dp.visit(file, _range, packet); diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index 4c3c5609a93..f724f5035c8 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -1,5 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "translogserver.h" +#include <vespa/searchlib/common/gatecallback.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/exceptions.h> @@ -13,6 +14,9 @@ using vespalib::make_string; using vespalib::stringref; using vespalib::IllegalArgumentException; using search::common::FileHeaderContext; +using std::make_shared; +using std::runtime_error; +using namespace std::chrono_literals; namespace search::transactionlog { @@ -26,21 +30,16 @@ class SyncHandler : public FNET_Task SerialNum _syncTo; public: - SyncHandler(FRT_Supervisor *supervisor, - FRT_RPCRequest *req,const Domain::SP &domain, - const TransLogServer::Session::SP &session, - SerialNum syncTo); + SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req,const Domain::SP &domain, + const TransLogServer::Session::SP &session, SerialNum syncTo); ~SyncHandler(); void PerformTask() override; }; -SyncHandler::SyncHandler(FRT_Supervisor *supervisor, - FRT_RPCRequest *req, - const Domain::SP &domain, - const TransLogServer::Session::SP &session, - SerialNum syncTo) +SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const Domain::SP &domain, + const TransLogServer::Session::SP &session, SerialNum syncTo) : FNET_Task(supervisor->GetScheduler()), _req(*req), _domain(domain), @@ -50,9 +49,7 @@ SyncHandler::SyncHandler(FRT_Supervisor *supervisor, } -SyncHandler::~SyncHandler() -{ -} +SyncHandler::~SyncHandler() = default; void @@ -77,25 +74,25 @@ SyncHandler::PerformTask() TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const FileHeaderContext &fileHeaderContext) - : TransLogServer(name, listenPort, baseDir, fileHeaderContext, 0x10000000) + : TransLogServer(name, listenPort, baseDir, fileHeaderContext, + DomainConfig().setEncoding(Encoding(Encoding::xxh64, Encoding::Compression::none)) + .setPartSizeLimit(0x10000000).setChunkSizeLimit(0x40000).setChunkAgeLimit( 1ms)) {} TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize) - : TransLogServer(name, listenPort, baseDir, fileHeaderContext, domainPartSize, 4, DomainPart::Crc::xxh64) + const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg) + : TransLogServer(name, listenPort, baseDir, fileHeaderContext, cfg, 4) {} TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize, - size_t maxThreads, DomainPart::Crc defaultCrcType) + const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads) : FRT_Invokable(), _name(name), _baseDir(baseDir), - _domainPartSize(domainPartSize), - _defaultCrcType(defaultCrcType), + _domainConfig(cfg), _commitExecutor(maxThreads, 128*1024), _sessionExecutor(maxThreads, 128*1024), - _threadPool(8192, 1), + _threadPool(0x20000), _supervisor(std::make_unique<FRT_Supervisor>()), _domains(), _reqQ(), @@ -110,8 +107,8 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con domainDir >> domainName; if ( ! domainName.empty()) { try { - auto domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor, - _domainPartSize, _defaultCrcType,_fileHeaderContext); + auto domain = make_shared<Domain>(domainName, dir(), _threadPool, _commitExecutor, + _sessionExecutor, cfg,_fileHeaderContext); _domains[domain->name()] = domain; } catch (const std::exception & e) { LOG(warning, "Failed creating %s domain on startup. Exception = %s", domainName.c_str(), e.what()); @@ -132,13 +129,13 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con } } if ( ! listenOk ) { - throw std::runtime_error(make_string("Failed listening at port %s. Giving up. Requires manual intervention.", listenSpec)); + throw runtime_error(make_string("Failed listening at port %s. Giving up. Requires manual intervention.", listenSpec)); } } else { - throw std::runtime_error(make_string("Failed creating tls dir %s r(%d), e(%d). Requires manual intervention.", dir().c_str(), retval, errno)); + throw runtime_error(make_string("Failed creating tls dir %s r(%d), e(%d). Requires manual intervention.", dir().c_str(), retval, errno)); } } else { - throw std::runtime_error(make_string("Failed creating tls base dir %s r(%d), e(%d). Requires manual intervention.", _baseDir.c_str(), retval, errno)); + throw runtime_error(make_string("Failed creating tls base dir %s r(%d), e(%d). Requires manual intervention.", _baseDir.c_str(), retval, errno)); } start(_threadPool); } @@ -200,7 +197,7 @@ void TransLogServer::run() } } logMetric(); - } while (running() && !(hasPacket && (req == NULL))); + } while (running() && !(hasPacket && (req == nullptr))); LOG(info, "TLS Stopped"); } @@ -215,6 +212,17 @@ void TransLogServer::logMetric() const } } + +TransLogServer & +TransLogServer::setDomainConfig(const DomainConfig & cfg) { + Guard domainGuard(_lock); + _domainConfig = cfg; + for(auto &domain: _domains) { + domain.second->setConfig(cfg); + } + return *this; +} + DomainStats TransLogServer::getDomainStats() const { @@ -349,8 +357,8 @@ void TransLogServer::createDomain(FRT_RPCRequest *req) Domain::SP domain(findDomain(domainName)); if ( !domain ) { try { - domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor, - _domainPartSize, _defaultCrcType, _fileHeaderContext); + domain = make_shared<Domain>(domainName, dir(), _threadPool, _commitExecutor, + _sessionExecutor, _domainConfig, _fileHeaderContext); { Guard domainGuard(_lock); _domains[domain->name()] = domain; @@ -462,7 +470,7 @@ void TransLogServer::commit(const vespalib::string & domainName, const Packet & (void) done; Domain::SP domain(findDomain(domainName)); if (domain) { - domain->commit(packet); + domain->commit(packet, std::move(done)); } else { throw IllegalArgumentException("Could not find domain " + domainName); } @@ -478,7 +486,9 @@ void TransLogServer::domainCommit(FRT_RPCRequest *req) if (domain) { Packet packet(params[1]._data._buf, params[1]._data._len); try { - domain->commit(packet); + vespalib::Gate gate; + domain->commit(packet, make_shared<GateCallback>(gate)); + gate.await(); ret.AddInt32(0); ret.AddString("ok"); } catch (const std::exception & e) { diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index d78d3d39887..83627d2d3e3 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -22,16 +22,15 @@ public: typedef std::shared_ptr<TransLogServer> SP; TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const common::FileHeaderContext &fileHeaderContext, - uint64_t domainPartSize, size_t maxThreads, DomainPart::Crc defaultCrc); + const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads); TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const common::FileHeaderContext &fileHeaderContext, uint64_t domainPartSize); + const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg); TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const common::FileHeaderContext &fileHeaderContext); ~TransLogServer() override; DomainStats getDomainStats() const; - void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) override; + TransLogServer & setDomainConfig(const DomainConfig & cfg); class Session { @@ -79,8 +78,7 @@ private: vespalib::string _name; vespalib::string _baseDir; - const uint64_t _domainPartSize; - const DomainPart::Crc _defaultCrcType; + DomainConfig _domainConfig; vespalib::ThreadStackExecutor _commitExecutor; vespalib::ThreadStackExecutor _sessionExecutor; FastOS_ThreadPool _threadPool; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp index ff4a402b438..de3d72331d6 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp @@ -7,12 +7,17 @@ LOG_SETUP(".translogserverapp"); using search::common::FileHeaderContext; +using namespace std::chrono_literals; namespace search::transactionlog { +using LockGuard = std::lock_guard<std::mutex>; +using std::make_unique; + TransLogServerApp::TransLogServerApp(const config::ConfigUri & tlsConfigUri, const FileHeaderContext & fileHeaderContext) - : _tls(), + : _lock(), + _tls(), _tlsConfig(), _tlsConfigFetcher(tlsConfigUri.getContext()), _fileHeaderContext(fileHeaderContext) @@ -23,24 +28,58 @@ TransLogServerApp::TransLogServerApp(const config::ConfigUri & tlsConfigUri, namespace { -DomainPart::Crc getCrc(searchlib::TranslogserverConfig::Crcmethod crcType) + +Encoding::Crc +getCrc(searchlib::TranslogserverConfig::Crcmethod type) { - switch (crcType) { + switch (type) { case searchlib::TranslogserverConfig::ccitt_crc32: - return DomainPart::ccitt_crc32; + return Encoding::Crc::ccitt_crc32; case searchlib::TranslogserverConfig::xxh64: - return DomainPart::xxh64; + return Encoding::Crc::xxh64; + } + return Encoding::Crc::xxh64; +} + +Encoding::Compression +getCompression(searchlib::TranslogserverConfig::Compression::Type type) +{ + switch (type) { + case searchlib::TranslogserverConfig::Compression::NONE: + return Encoding::Compression::none; + case searchlib::TranslogserverConfig::Compression::LZ4: + return Encoding::Compression::lz4; + case searchlib::TranslogserverConfig::Compression::ZSTD: + return Encoding::Compression::zstd; } - abort(); + return Encoding::Compression::lz4; +} + +Encoding +getEncoding(const searchlib::TranslogserverConfig & cfg) +{ + return Encoding(getCrc(cfg.crcmethod), getCompression(cfg.compression.type)); +} + +DomainConfig +getDomainConfig(const searchlib::TranslogserverConfig & cfg) { + DomainConfig dcfg; + dcfg.setEncoding(getEncoding(cfg)) + .setCompressionLevel(cfg.compression.level) + .setPartSizeLimit(cfg.filesizemax) + .setChunkSizeLimit(cfg.chunk.sizelimit) + .setChunkAgeLimit(std::chrono::microseconds(int64_t(cfg.chunk.agelimit*1000000))); + return dcfg; } } void TransLogServerApp::start() { - std::shared_ptr<searchlib::TranslogserverConfig> c = _tlsConfig.get(); - _tls.reset(new TransLogServer(c->servername, c->listenport, c->basedir, _fileHeaderContext, - c->filesizemax, c->maxthreads, getCrc(c->crcmethod))); + LockGuard guard(_lock); + auto c = _tlsConfig.get(); + _tls = make_unique<TransLogServer>(c->servername, c->listenport, c->basedir, _fileHeaderContext, + getDomainConfig(*c), c->maxthreads); } TransLogServerApp::~TransLogServerApp() @@ -51,8 +90,12 @@ TransLogServerApp::~TransLogServerApp() void TransLogServerApp::configure(std::unique_ptr<searchlib::TranslogserverConfig> cfg) { LOG(config, "configure Transaction Log Server %s at port %d", cfg->servername.c_str(), cfg->listenport); + LockGuard guard(_lock); _tlsConfig.set(cfg.release()); _tlsConfig.latch(); + if (_tls) { + _tls->setDomainConfig(getDomainConfig(*cfg)); + } } } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h index 35fa994d1e4..ea6d0158cec 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h @@ -14,6 +14,7 @@ namespace search::transactionlog { class TransLogServerApp : public config::IFetcherCallback<searchlib::TranslogserverConfig> { private: + std::mutex _lock; TransLogServer::SP _tls; vespalib::PtrHolder<searchlib::TranslogserverConfig> _tlsConfig; config::ConfigFetcher _tlsConfigFetcher; |