diff options
author | Arnstein Ressem <aressem@gmail.com> | 2020-09-02 20:54:35 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-02 20:54:35 +0200 |
commit | 8301699fbc42ac165c9e1ab09f3dbbc680181821 (patch) | |
tree | 0a66b82ddb9a099568b54f26cdf9b374fd9ad26a /searchlib | |
parent | 51f266785a4d6f1b3ac3e88ac897adae2ab94459 (diff) |
Revert "Configure compression and chunk size"
Diffstat (limited to 'searchlib')
12 files changed, 361 insertions, 543 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index d003bad0582..0dced597917 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -5,6 +5,7 @@ #include <vespa/vespalib/objects/identifiable.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/fastos/file.h> +#include <map> #include <vespa/log/log.h> LOG_SETUP("translogclient_test"); @@ -13,24 +14,9 @@ using namespace search; using namespace transactionlog; using namespace document; using namespace vespalib; -using namespace std::chrono_literals; using search::index::DummyFileHeaderContext; -namespace { - -bool createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains=0); -TransLogClient::Session::UP openDomainTest(TransLogClient & tls, const vespalib::string & name); -bool fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name); -void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries); -void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize); -uint32_t countFiles(const vespalib::string &dir); -void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries); -bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name); -void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains); -void verifyDomain(const vespalib::string & name); - -vespalib::string -myhex(const void * b, size_t sz) +vespalib::string myhex(const void * b, size_t sz) { static const char * hextab="0123456789ABCDEF"; const unsigned char * c = static_cast<const unsigned char *>(b); @@ -43,6 +29,35 @@ myhex(const void * b, size_t sz) return s; } +class Test : public vespalib::TestApp +{ +public: + int Main() override; +private: + bool createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains=0); + TransLogClient::Session::UP openDomainTest(TransLogClient & tls, const vespalib::string & name); + bool fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name); + void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries); + void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize); + uint32_t countFiles(const vespalib::string &dir); + void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries); + bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name); + bool partialUpdateTest(); + bool testVisitOverGeneratedDomain(); + bool testRemove(); + void createAndFillDomain(const vespalib::string & name, DomainPart::Crc crcMethod, size_t preExistingDomains); + void verifyDomain(const vespalib::string & name); + void testCrcVersions(); + bool testVisitOverPreExistingDomain(); + void testMany(); + void testErase(); + void testSync(); + void testTruncateOnShortRead(); + void testTruncateOnVersionMismatch(); +}; + +TEST_APPHOOK(Test); + class CallBackTest : public TransLogClient::Visitor::Callback { private: @@ -60,8 +75,7 @@ public: bool _eof; }; -RPC::Result -CallBackTest::receive(const Packet & p) +RPC::Result CallBackTest::receive(const Packet & p) { nbostream_longlivedbuf h(p.getHandle().data(), p.getHandle().size()); LOG(info,"CallBackTest::receive (%zu, %zu, %zu)(%s)", h.rp(), h.size(), h.capacity(), myhex(h.peek(), h.size()).c_str()); @@ -87,8 +101,7 @@ public: size_t _value; }; -RPC::Result -CallBackManyTest::receive(const Packet & p) +RPC::Result CallBackManyTest::receive(const Packet & p) { nbostream_longlivedbuf h(p.getHandle().data(), p.getHandle().size()); for(;h.size() > 0; _count++, _value++) { @@ -120,8 +133,7 @@ public: }; -RPC::Result -CallBackUpdate::receive(const Packet & packet) +RPC::Result CallBackUpdate::receive(const Packet & packet) { nbostream_longlivedbuf h(packet.getHandle().data(), packet.getHandle().size()); while (h.size() > 0) { @@ -173,8 +185,7 @@ public: SerialNum _prevSerial; }; -RPC::Result -CallBackStatsTest::receive(const Packet & p) +RPC::Result CallBackStatsTest::receive(const Packet & p) { nbostream_longlivedbuf h(p.getHandle().data(), p.getHandle().size()); for(;h.size() > 0; ++_count) { @@ -210,10 +221,67 @@ public: IMPLEMENT_IDENTIFIABLE(TestIdentifiable, Identifiable); -constexpr size_t DEFAULT_PACKET_SIZE = 0xf000; +bool Test::partialUpdateTest() +{ + bool retval(false); + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); + TransLogClient tls("tcp/localhost:18377"); + + TransLogClient::Session::UP s1 = openDomainTest(tls, "test1"); + TransLogClient::Session & session = *s1; + + TestIdentifiable du; + + nbostream os; + os << du; + + vespalib::ConstBufferRef bb(os.data(), os.size()); + LOG(info, "DU : %s", myhex(bb.c_str(), bb.size()).c_str()); + Packet::Entry e(7, du.getClass().id(), bb); + Packet pa; + pa.add(e); + pa.close(); + ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().data(), pa.getHandle().size()))); + + CallBackUpdate ca; + TransLogClient::Visitor::UP visitor = tls.createVisitor("test1", ca); + ASSERT_TRUE(visitor.get()); + ASSERT_TRUE( visitor->visit(5, 7) ); + for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } + ASSERT_TRUE( ca._eof ); + ASSERT_TRUE( ca.map().size() == 1); + ASSERT_TRUE( ca.hasSerial(7) ); + + CallBackUpdate ca1; + TransLogClient::Visitor::UP visitor1 = tls.createVisitor("test1", ca1); + ASSERT_TRUE(visitor1.get()); + ASSERT_TRUE( visitor1->visit(4, 5) ); + for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } + ASSERT_TRUE( ca1._eof ); + ASSERT_TRUE( ca1.map().size() == 0); + + CallBackUpdate ca2; + TransLogClient::Visitor::UP visitor2 = tls.createVisitor("test1", ca2); + ASSERT_TRUE(visitor2.get()); + ASSERT_TRUE( visitor2->visit(5, 6) ); + for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } + ASSERT_TRUE( ca2._eof ); + ASSERT_TRUE( ca2.map().size() == 0); + + CallBackUpdate ca3; + TransLogClient::Visitor::UP visitor3 = tls.createVisitor("test1", ca3); + ASSERT_TRUE(visitor3.get()); + ASSERT_TRUE( visitor3->visit(5, 1000) ); + for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } + ASSERT_TRUE( ca3._eof ); + ASSERT_TRUE( ca3.map().size() == 1); + ASSERT_TRUE( ca3.hasSerial(7) ); + + return retval; +} -bool -createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains) +bool Test::createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains) { bool retval(true); std::vector<vespalib::string> dir; @@ -230,40 +298,43 @@ createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t pre return retval; } -TransLogClient::Session::UP -openDomainTest(TransLogClient & tls, const vespalib::string & name) +TransLogClient::Session::UP Test::openDomainTest(TransLogClient & tls, const vespalib::string & name) { TransLogClient::Session::UP s1 = tls.open(name); ASSERT_TRUE (s1.get() != NULL); return s1; } -bool -fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name) +bool Test::fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name) { bool retval(true); Packet::Entry e1(1, 1, vespalib::ConstBufferRef("Content in buffer A", 20)); Packet::Entry e2(2, 2, vespalib::ConstBufferRef("Content in buffer B", 20)); Packet::Entry e3(3, 1, vespalib::ConstBufferRef("Content in buffer C", 20)); - Packet a(DEFAULT_PACKET_SIZE); - a.add(e1); - Packet b(DEFAULT_PACKET_SIZE); - b.add(e2); - b.add(e3); - EXPECT_FALSE(b.add(e1)); + Packet a; + ASSERT_TRUE (a.add(e1)); + Packet b; + ASSERT_TRUE (b.add(e2)); + ASSERT_TRUE (b.add(e3)); + ASSERT_TRUE (!b.add(e1)); + a.close(); + b.close(); ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size()))); ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(b.getHandle().data(), b.getHandle().size()))); - EXPECT_EXCEPTION(s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size())), - std::runtime_error, - "commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3)."); + try { + s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size())); + ASSERT_TRUE(false); + } catch (const std::exception & e) { + EXPECT_EQUAL(vespalib::string("commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3)."), e.what()); + } EXPECT_EQUAL(a.size(), 1u); EXPECT_EQUAL(a.range().from(), 1u); EXPECT_EQUAL(a.range().to(), 1u); EXPECT_EQUAL(b.size(), 2u); EXPECT_EQUAL(b.range().from(), 2u); EXPECT_EQUAL(b.range().to(), 3u); - a.merge(b); + EXPECT_TRUE(a.merge(b)); EXPECT_EQUAL(a.size(), 3u); EXPECT_EQUAL(a.range().from(), 1u); EXPECT_EQUAL(a.range().to(), 3u); @@ -278,82 +349,52 @@ fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name) return retval; } -void -fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries) +void Test::fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries) { size_t value(0); for(size_t i=0; i < numPackets; i++) { - std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE)); + std::unique_ptr<Packet> p(new Packet()); for(size_t j=0; j < numEntries; j++, value++) { Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value))); - p->add(e); - if (p->sizeBytes() > DEFAULT_PACKET_SIZE){ + if ( ! p->add(e) ) { + p->close(); ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size()))); - p.reset(new Packet(DEFAULT_PACKET_SIZE)); + p.reset(new Packet()); + ASSERT_TRUE(p->add(e)); } } + p->close(); ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size()))); } } -using Counter = std::atomic<size_t>; - -class CountDone : public IDestructorCallback { -public: - CountDone(Counter & inFlight) : _inFlight(inFlight) { ++_inFlight; } - ~CountDone() override { --_inFlight; } -private: - Counter & _inFlight; -}; - -void -fillDomainTest(TransLogServer & s1, const vespalib::string & domain, size_t numPackets, size_t numEntries) -{ - size_t value(0); - Counter inFlight(0); - for(size_t i=0; i < numPackets; i++) { - std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE)); - for(size_t j=0; j < numEntries; j++, value++) { - Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value))); - p->add(e); - if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) { - s1.commit(domain, *p, std::make_shared<CountDone>(inFlight)); - p.reset(new Packet(DEFAULT_PACKET_SIZE)); - } - } - s1.commit(domain, *p, std::make_shared<CountDone>(inFlight)); - LOG(info, "Inflight %ld", inFlight.load()); - } - while (inFlight.load() != 0) { - std::this_thread::sleep_for(10ms); - LOG(info, "Waiting for inflight %ld to reach zero", inFlight.load()); - } - -} - void -fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize) +Test::fillDomainTest(TransLogClient::Session * s1, + size_t numPackets, size_t numEntries, + size_t entrySize) { size_t value(0); std::vector<char> entryBuffer(entrySize); for(size_t i=0; i < numPackets; i++) { - std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE)); + std::unique_ptr<Packet> p(new Packet()); for(size_t j=0; j < numEntries; j++, value++) { Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&entryBuffer[0], entryBuffer.size())); - p->add(e); - if (p->sizeBytes() > DEFAULT_PACKET_SIZE){ + if ( ! p->add(e) ) { + p->close(); ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size()))); - p.reset(new Packet(DEFAULT_PACKET_SIZE)); + p.reset(new Packet()); + ASSERT_TRUE(p->add(e)); } } + p->close(); ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size()))); } } uint32_t -countFiles(const vespalib::string &dir) +Test::countFiles(const vespalib::string &dir) { uint32_t res = 0; FastOS_DirectoryScan dirScan(dir.c_str()); @@ -367,8 +408,10 @@ countFiles(const vespalib::string &dir) return res; } + void -checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries) +Test::checkFilledDomainTest(const TransLogClient::Session::UP &s1, + size_t numEntries) { SerialNum b(0), e(0); size_t c(0); @@ -378,8 +421,8 @@ checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries) EXPECT_EQUAL(c, numEntries); } -bool -visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name) + +bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name) { bool retval(true); @@ -443,31 +486,10 @@ getMaxSessionRunTime(TransLogServer &tls, const vespalib::string &domain) return tls.getDomainStats()[domain].maxSessionRunTime.count(); } -void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains) +bool Test::testVisitOverGeneratedDomain() { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test13", 18377, ".", fileHeaderContext, - DomainConfig().setPartSizeLimit(0x1000000).setEncoding(encoding), 4); - TransLogClient tls("tcp/localhost:18377"); - - createDomainTest(tls, name, preExistingDomains); - TransLogClient::Session::UP s1 = openDomainTest(tls, name); - fillDomainTest(s1.get(), name); -} - -void verifyDomain(const vespalib::string & name) { - DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test13", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); - TransLogClient tls("tcp/localhost:18377"); - TransLogClient::Session::UP s1 = openDomainTest(tls, name); - visitDomainTest(tls, s1.get(), name); -} - -} - -TEST("testVisitOverGeneratedDomain") { - DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); TransLogClient tls("tcp/localhost:18377"); vespalib::string name("test1"); @@ -479,85 +501,42 @@ TEST("testVisitOverGeneratedDomain") { double maxSessionRunTime = getMaxSessionRunTime(tlss, "test1"); LOG(info, "testVisitOverGeneratedDomain(): maxSessionRunTime=%f", maxSessionRunTime); EXPECT_GREATER(maxSessionRunTime, 0); + return true; } -TEST("testVisitOverPreExistingDomain") { - // Depends on Test::testVisitOverGeneratedDomain() +void Test::createAndFillDomain(const vespalib::string & name, DomainPart::Crc crcMethod, size_t preExistingDomains) +{ DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); + TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000, 4, crcMethod); TransLogClient tls("tcp/localhost:18377"); - vespalib::string name("test1"); + createDomainTest(tls, name, preExistingDomains); TransLogClient::Session::UP s1 = openDomainTest(tls, name); - visitDomainTest(tls, s1.get(), name); + fillDomainTest(s1.get(), name); } -TEST("partialUpdateTest") { +void Test::verifyDomain(const vespalib::string & name) +{ DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); + TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000); TransLogClient tls("tcp/localhost:18377"); - - TransLogClient::Session::UP s1 = openDomainTest(tls, "test1"); - TransLogClient::Session & session = *s1; - - TestIdentifiable du; - - nbostream os; - os << du; - - vespalib::ConstBufferRef bb(os.data(), os.size()); - LOG(info, "DU : %s", myhex(bb.c_str(), bb.size()).c_str()); - Packet::Entry e(7, du.getClass().id(), bb); - Packet pa(DEFAULT_PACKET_SIZE); - pa.add(e); - ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().data(), pa.getHandle().size()))); - - CallBackUpdate ca; - TransLogClient::Visitor::UP visitor = tls.createVisitor("test1", ca); - ASSERT_TRUE(visitor.get()); - ASSERT_TRUE( visitor->visit(5, 7) ); - for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca._eof ); - ASSERT_TRUE( ca.map().size() == 1); - ASSERT_TRUE( ca.hasSerial(7) ); - - CallBackUpdate ca1; - TransLogClient::Visitor::UP visitor1 = tls.createVisitor("test1", ca1); - ASSERT_TRUE(visitor1.get()); - ASSERT_TRUE( visitor1->visit(4, 5) ); - for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca1._eof ); - ASSERT_TRUE( ca1.map().size() == 0); - - CallBackUpdate ca2; - TransLogClient::Visitor::UP visitor2 = tls.createVisitor("test1", ca2); - ASSERT_TRUE(visitor2.get()); - ASSERT_TRUE( visitor2->visit(5, 6) ); - for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca2._eof ); - ASSERT_TRUE( ca2.map().size() == 0); - - CallBackUpdate ca3; - TransLogClient::Visitor::UP visitor3 = tls.createVisitor("test1", ca3); - ASSERT_TRUE(visitor3.get()); - ASSERT_TRUE( visitor3->visit(5, 1000) ); - for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca3._eof ); - ASSERT_TRUE( ca3.map().size() == 1); - ASSERT_TRUE( ca3.hasSerial(7) ); + TransLogClient::Session::UP s1 = openDomainTest(tls, name); + visitDomainTest(tls, s1.get(), name); } -TEST("testCrcVersions") { - createAndFillDomain("ccitt_crc32", Encoding(Encoding::Crc::ccitt_crc32, Encoding::Compression::none), 0); - createAndFillDomain("xxh64", Encoding(Encoding::Crc::xxh64, Encoding::Compression::none), 1); +void Test::testCrcVersions() +{ + createAndFillDomain("ccitt_crc32", DomainPart::ccitt_crc32, 0); + createAndFillDomain("xxh64", DomainPart::xxh64, 1); verifyDomain("ccitt_crc32"); verifyDomain("xxh64"); } -TEST("testRemove") { +bool Test::testRemove() +{ DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); + TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, 0x10000); TransLogClient tls("tcp/localhost:18377"); vespalib::string name("test-delete"); @@ -566,6 +545,21 @@ TEST("testRemove") { fillDomainTest(s1.get(), name); visitDomainTest(tls, s1.get(), name); ASSERT_TRUE(tls.remove(name)); + + return true; +} + +bool Test::testVisitOverPreExistingDomain() +{ + // Depends on Test::testVisitOverGeneratedDomain() + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); + TransLogClient tls("tcp/localhost:18377"); + + vespalib::string name("test1"); + TransLogClient::Session::UP s1 = openDomainTest(tls, name); + visitDomainTest(tls, s1.get(), name); + return true; } namespace { @@ -606,19 +600,18 @@ assertStatus(TransLogClient::Session &s, } -TEST("test sending a lot of data") { +void Test::testMany() +{ const unsigned int NUM_PACKETS = 1000; const unsigned int NUM_ENTRIES = 100; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; - const vespalib::string MANY("many"); { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000) - .setChunkAgeLimit(100us)); + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x80000); TransLogClient tls("tcp/localhost:18377"); - createDomainTest(tls, MANY, 0); - TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); + createDomainTest(tls, "many", 0); + TransLogClient::Session::UP s1 = openDomainTest(tls, "many"); fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES); SerialNum b(0), e(0); size_t c(0); @@ -637,7 +630,7 @@ TEST("test sending a lot of data") { } { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x1000000); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, "many"); @@ -648,79 +641,7 @@ TEST("test sending a lot of data") { EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); - ASSERT_TRUE(visitor.get()); - ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); - for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca._eof ); - EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES); - EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES); - } - { - DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); - TransLogClient tls("tcp/localhost:18377"); - - TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); - SerialNum b(0), e(0); - size_t c(0); - EXPECT_TRUE(s1->status(b, e, c)); - EXPECT_EQUAL(b, 1u); - EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); - EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); - CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); - ASSERT_TRUE(visitor.get()); - ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); - for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca._eof ); - EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES); - EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES); - } -} - -TEST("test sending a lot of data async") { - const unsigned int NUM_PACKETS = 1000; - const unsigned int NUM_ENTRIES = 100; - const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; - const vespalib::string MANY("many-async"); - { - DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000) - .setChunkAgeLimit(10ms)); - TransLogClient tls("tcp/localhost:18377"); - createDomainTest(tls, MANY, 1); - TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); - fillDomainTest(tlss, MANY, NUM_PACKETS, NUM_ENTRIES); - SerialNum b(0), e(0); - size_t c(0); - EXPECT_TRUE(s1->status(b, e, c)); - - EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); - EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); - CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); - ASSERT_TRUE(visitor.get()); - ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); - for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca._eof ); - EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES); - EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES); - } - { - DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); - TransLogClient tls("tcp/localhost:18377"); - - TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); - SerialNum b(0), e(0); - size_t c(0); - EXPECT_TRUE(s1->status(b, e, c)); - EXPECT_EQUAL(b, 1u); - EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); - EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); - CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); + TransLogClient::Visitor::UP visitor = tls.createVisitor("many", ca); ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -730,16 +651,14 @@ TEST("test sending a lot of data async") { } } - - - -TEST("testErase") { +void Test::testErase() +{ const unsigned int NUM_PACKETS = 1000; const unsigned int NUM_ENTRIES = 100; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000)); + TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x80000); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "erase", 0); @@ -748,7 +667,7 @@ TEST("testErase") { } { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); + TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x1000000); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, "erase"); @@ -829,13 +748,16 @@ TEST("testErase") { } } -TEST("testSync") { + +void +Test::testSync() +{ const unsigned int NUM_PACKETS = 3; const unsigned int NUM_ENTRIES = 4; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test9", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); + TransLogServer tlss("test9", 18377, ".", fileHeaderContext, 0x1000000); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "sync", 0); @@ -848,7 +770,10 @@ TEST("testSync") { EXPECT_EQUAL(syncedTo, TOTAL_NUM_ENTRIES); } -TEST("test truncate on version mismatch") { + +void +Test::testTruncateOnVersionMismatch() +{ const unsigned int NUM_PACKETS = 3; const unsigned int NUM_ENTRIES = 4; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; @@ -857,7 +782,7 @@ TEST("test truncate on version mismatch") { size_t countOld(0); DummyFileHeaderContext fileHeaderContext; { - TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); + TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "sync", 0); @@ -878,7 +803,7 @@ TEST("test truncate on version mismatch") { EXPECT_EQUAL(static_cast<ssize_t>(sizeof(tmp)), f.Write2(tmp, sizeof(tmp))); EXPECT_TRUE(f.Close()); { - TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); + TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, "sync"); uint64_t from(0), to(0); @@ -890,7 +815,9 @@ TEST("test truncate on version mismatch") { } } -TEST("test truncation after short read") { +void +Test::testTruncateOnShortRead() +{ const unsigned int NUM_PACKETS = 17; const unsigned int NUM_ENTRIES = 1; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; @@ -902,7 +829,7 @@ TEST("test truncation after short read") { DummyFileHeaderContext fileHeaderContext; { - TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); TransLogClient tls(tlsspec); createDomainTest(tls, domain, 0); @@ -918,7 +845,7 @@ TEST("test truncation after short read") { EXPECT_EQUAL(2u, countFiles(dir)); } { - TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); TransLogClient tls(tlsspec); TransLogClient::Session::UP s1 = openDomainTest(tls, domain); checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES); @@ -934,7 +861,7 @@ TEST("test truncation after short read") { trfile.Close(); } { - TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); TransLogClient tls(tlsspec); TransLogClient::Session::UP s1 = openDomainTest(tls, domain); checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES - 1); @@ -944,4 +871,28 @@ TEST("test truncation after short read") { } } -TEST_MAIN() { TEST_RUN_ALL(); } + +int Test::Main() +{ + TEST_INIT("translogclient_test"); + + if (_argc > 0) { + DummyFileHeaderContext::setCreator(_argv[0]); + } + testVisitOverGeneratedDomain(); + testVisitOverPreExistingDomain(); + testMany(); + testErase(); + partialUpdateTest(); + + testRemove(); + + testSync(); + + testTruncateOnShortRead(); + testTruncateOnVersionMismatch(); + + testCrcVersions(); + + TEST_DONE(); +} diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp index 925f297bf48..013ca81dcc9 100644 --- a/searchlib/src/tests/transactionlogstress/translogstress.cpp +++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp @@ -698,7 +698,7 @@ TransLogStress::Main() // start transaction log server DummyFileHeaderContext fileHeaderContext; - TransLogServer tls("server", 17897, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(_cfg.domainPartSize)); + TransLogServer tls("server", 17897, ".", fileHeaderContext, _cfg.domainPartSize); TransLogClient client(tlsSpec); client.create(domain); diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def index f822fc80fc1..74efe3fe68e 100644 --- a/searchlib/src/vespa/searchlib/config/translogserver.def +++ b/searchlib/src/vespa/searchlib/config/translogserver.def @@ -5,7 +5,7 @@ namespace=searchlib listenport int default=13700 restart ## Max file size (50M) -filesizemax int default=50000000 +filesizemax int default=50000000 restart ## Server name to identify server. servername string default="tls" restart @@ -22,17 +22,3 @@ maxthreads int default=4 restart ##Default crc method used crcmethod enum {ccitt_crc32, xxh64} default=xxh64 - -## Control compression type. -compression.type enum {NONE, NONE_MULTI, LZ4, ZSTD} default=LZ4 - -## Control compression level -## LZ4 has normal range 1..9 while ZSTD has range 1..19 -## 9 is a reasonable default for both -compression.level int default=9 - -## How large a chunk can grow in memory before beeing flushed -chunk.sizelimit int default = 256000 # 256k - -## How long a chunk can reside in memory befor ebeeing flushed to disk. -chunk.agelimit double default = 0.010 # 10 milliseconds diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 804a558789e..5e7cfc74199 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -4,14 +4,11 @@ #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/io/fileutil.h> -#include <vespa/vespalib/util/lambdatask.h> #include <vespa/fastos/file.h> #include <algorithm> #include <thread> #include <vespa/log/log.h> -#include <vespa/vespalib/util/threadstackexecutor.h> - LOG_SETUP(".transactionlog.domain"); using vespalib::string; @@ -23,43 +20,29 @@ using vespalib::Monitor; using vespalib::MonitorGuard; using search::common::FileHeaderContext; using std::runtime_error; -using std::make_shared; +using namespace std::chrono_literals; namespace search::transactionlog { -VESPA_THREAD_STACK_TAG(domain_commit_executor); - -DomainConfig::DomainConfig() - : _encoding(Encoding::Crc::xxh64, Encoding::Compression::none), - _compressionLevel(9), - _partSizeLimit(0x10000000), // 256M - _chunkSizeLimit(0x40000), // 256k - _chunkAgeLimit(10ms) -{ } - -Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPool & threadPool, - Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, - const FileHeaderContext &fileHeaderContext) - : _config(cfg), - _lastSerial(0), - _threadPool(threadPool), - _singleCommiter(std::make_unique<vespalib::ThreadStackExecutor>(1, 128*1024)), - _commitExecutor(commitExecutor), - _sessionExecutor(sessionExecutor), - _sessionId(1), - _syncMonitor(), - _pendingSync(false), - _name(domainName), - _parts(), - _lock(), - _currentChunkMonitor(), - _sessionLock(), - _sessions(), - _maxSessionRunTime(), - _baseDir(baseDir), - _fileHeaderContext(fileHeaderContext), - _markedDeleted(false), - _self(nullptr) +Domain::Domain(const string &domainName, const string & baseDir, Executor & commitExecutor, + Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType, + const FileHeaderContext &fileHeaderContext) : + _defaultCrcType(defaultCrcType), + _commitExecutor(commitExecutor), + _sessionExecutor(sessionExecutor), + _sessionId(1), + _syncMonitor(), + _pendingSync(false), + _name(domainName), + _domainPartSize(domainPartSize), + _parts(), + _lock(), + _sessionLock(), + _sessions(), + _maxSessionRunTime(), + _baseDir(baseDir), + _fileHeaderContext(fileHeaderContext), + _markedDeleted(false) { int retval(0); if ((retval = makeDirectory(_baseDir.c_str())) != 0) { @@ -77,22 +60,13 @@ Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPo } _sessionExecutor.sync(); if (_parts.empty() || _parts.crbegin()->second->isClosed()) { - _parts[lastPart] = std::make_shared<DomainPart>(_name, dir(), lastPart, _config.getEncoding(), - _config.getCompressionlevel(), _fileHeaderContext, false); + _parts[lastPart] = std::make_shared<DomainPart>(_name, dir(), lastPart, _defaultCrcType, _fileHeaderContext, false); vespalib::File::sync(dir()); } - _lastSerial = end(); -} - -Domain & -Domain::setConfig(const DomainConfig & cfg) { - _config = cfg; - return *this; } void Domain::addPart(int64_t partId, bool isLastPart) { - auto dp = std::make_shared<DomainPart>(_name, dir(), partId, _config.getEncoding(), - _config.getCompressionlevel(), _fileHeaderContext, isLastPart); + auto dp = std::make_shared<DomainPart>(_name, dir(), partId, _defaultCrcType, _fileHeaderContext, isLastPart); if (dp->size() == 0) { // Only last domain part is allowed to be truncated down to // empty size. @@ -311,21 +285,18 @@ waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync) } -void -Domain::commit(const Packet & packet, Writer::DoneCallback onDone) +void Domain::commit(const Packet & packet) { - (void) onDone; DomainPart::SP dp(_parts.rbegin()->second); vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size()); Packet::Entry entry; entry.deserialize(is); - if (dp->byteSize() > _config.getPartSizeLimit()) { + if (dp->byteSize() > _domainPartSize) { waitPendingSync(_syncMonitor, _pendingSync); triggerSyncNow(); waitPendingSync(_syncMonitor, _pendingSync); dp->close(); - dp = std::make_shared<DomainPart>(_name, dir(), entry.serial(), _config.getEncoding(), - _config.getCompressionlevel(), _fileHeaderContext, false); + dp = std::make_shared<DomainPart>(_name, dir(), entry.serial(), _defaultCrcType, _fileHeaderContext, false); { LockGuard guard(_lock); _parts[entry.serial()] = dp; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index cc7f4ac5e4b..d6f964d5140 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -4,45 +4,20 @@ #include "domainpart.h" #include "session.h" #include <vespa/vespalib/util/threadexecutor.h> -#include <vespa/vespalib/util/time.h> -#include <vespa/fastos/thread.h> #include <chrono> namespace search::transactionlog { -class DomainConfig { -public: - using duration = vespalib::duration; - DomainConfig(); - DomainConfig & setEncoding(Encoding v) { _encoding = v; return *this; } - DomainConfig & setPartSizeLimit(size_t v) { _partSizeLimit = v; return *this; } - DomainConfig & setChunkSizeLimit(size_t v) { _chunkSizeLimit = v; return *this; } - DomainConfig & setChunkAgeLimit(vespalib::duration v) { _chunkAgeLimit = v; return *this; } - DomainConfig & setCompressionLevel(uint8_t v) { _compressionLevel = v; return *this; } - Encoding getEncoding() const { return _encoding; } - size_t getPartSizeLimit() const { return _partSizeLimit; } - size_t getChunkSizeLimit() const { return _chunkSizeLimit; } - duration getChunkAgeLimit() const { return _chunkAgeLimit; } - uint8_t getCompressionlevel() const { return _compressionLevel; } -private: - Encoding _encoding; - uint8_t _compressionLevel; - size_t _partSizeLimit; - size_t _chunkSizeLimit; - duration _chunkAgeLimit; -}; - struct PartInfo { SerialNumRange range; size_t numEntries; size_t byteSize; vespalib::string file; - PartInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, vespalib::stringref file_in) - : range(range_in), - numEntries(numEntries_in), - byteSize(byteSize_in), - file(file_in) - {} + PartInfo(SerialNumRange range_in, size_t numEntries_in, + size_t byteSize_in, + vespalib::stringref file_in) + : range(range_in), numEntries(numEntries_in), byteSize(byteSize_in), + file(file_in) {} }; struct DomainInfo { @@ -65,17 +40,17 @@ class Domain public: using SP = std::shared_ptr<Domain>; using Executor = vespalib::SyncableThreadExecutor; - Domain(const vespalib::string &name, const vespalib::string &baseDir, FastOS_ThreadPool & threadPool, - Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, + Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & commitExecutor, + Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType, const common::FileHeaderContext &fileHeaderContext); - ~Domain(); + virtual ~Domain(); DomainInfo getDomainInfo() const; const vespalib::string & name() const { return _name; } bool erase(SerialNum to); - void commit(const Packet & packet, Writer::DoneCallback onDone); + void commit(const Packet & packet); int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr<Session::Destination> dest); SerialNum begin() const; @@ -102,7 +77,7 @@ public: return _sessionExecutor.execute(std::move(task)); } uint64_t size() const; - Domain & setConfig(const DomainConfig & cfg); + private: SerialNum begin(const vespalib::LockGuard & guard) const; SerialNum end(const vespalib::LockGuard & guard) const; @@ -120,26 +95,22 @@ private: using DomainPartList = std::map<int64_t, DomainPart::SP>; using DurationSeconds = std::chrono::duration<double>; - DomainConfig _config; - SerialNum _lastSerial; - FastOS_ThreadPool & _threadPool; - std::unique_ptr<Executor> _singleCommiter; - Executor & _commitExecutor; - Executor & _sessionExecutor; - std::atomic<int> _sessionId; - vespalib::Monitor _syncMonitor; - bool _pendingSync; - vespalib::string _name; - DomainPartList _parts; - vespalib::Lock _lock; - vespalib::Monitor _currentChunkMonitor; - vespalib::Lock _sessionLock; - SessionList _sessions; - DurationSeconds _maxSessionRunTime; - vespalib::string _baseDir; + DomainPart::Crc _defaultCrcType; + Executor & _commitExecutor; + Executor & _sessionExecutor; + std::atomic<int> _sessionId; + vespalib::Monitor _syncMonitor; + bool _pendingSync; + vespalib::string _name; + uint64_t _domainPartSize; + DomainPartList _parts; + vespalib::Lock _lock; + vespalib::Lock _sessionLock; + SessionList _sessions; + DurationSeconds _maxSessionRunTime; + vespalib::string _baseDir; const common::FileHeaderContext &_fileHeaderContext; - bool _markedDeleted; - FastOS_ThreadInterface * _self; + bool _markedDeleted; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp index a6f140fce24..91599f8218a 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp @@ -176,20 +176,6 @@ handleReadError(const char *text, return retval; } -int32_t -calcCrc(Encoding::Crc version, const void * buf, size_t sz) -{ - if (version == Encoding::Crc::xxh64) { - return static_cast<int32_t>(XXH64(buf, sz, 0ll)); - } else if (version == Encoding::Crc::ccitt_crc32) { - vespalib::crc_32_type calculator; - calculator.process_bytes(buf, sz); - return calculator.checksum(); - } else { - LOG_ABORT("should not be reached"); - } -} - } int64_t @@ -279,23 +265,22 @@ DomainPart::buildPacketMapping(bool allowTruncate) return currPos; } -DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding, - uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate) - : _encoding(encoding), - _compressionLevel(compressionLevel), - _lock(), - _fileLock(), - _range(s), - _sz(0), - _byteSize(0), - _packets(), - _fileName(make_string("%s/%s-%016" PRIu64, baseDir.c_str(), name.c_str(), s)), - _transLog(std::make_unique<FastOS_File>(_fileName.c_str())), - _skipList(), - _headerLen(0), - _writeLock(), - _writtenSerial(0), - _syncedSerial(0) +DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Crc defaultCrc, + const FileHeaderContext &fileHeaderContext, bool allowTruncate) : + _defaultCrc(defaultCrc), + _lock(), + _fileLock(), + _range(s), + _sz(0), + _byteSize(0), + _packets(), + _fileName(make_string("%s/%s-%016" PRIu64, baseDir.c_str(), name.c_str(), s)), + _transLog(std::make_unique<FastOS_File>(_fileName.c_str())), + _skipList(), + _headerLen(0), + _writeLock(), + _writtenSerial(0), + _syncedSerial(0) { if (_transLog->OpenReadOnly()) { int64_t currPos = buildPacketMapping(allowTruncate); @@ -597,12 +582,12 @@ DomainPart::write(FastOS_FileInterface &file, const Packet::Entry &entry) int32_t crc(0); uint32_t len(entry.serializedSize() + sizeof(crc)); nbostream os; - os << static_cast<uint8_t>(_encoding.getRaw()); + os << static_cast<uint8_t>(_defaultCrc); os << len; size_t start(os.size()); entry.serialize(os); size_t end(os.size()); - crc = calcCrc(_encoding.getCrc(), os.data() + start, end - start); + crc = calcCrc(_defaultCrc, os.data() + start, end - start); os << crc; size_t osSize = os.size(); assert(osSize == len + sizeof(len) + sizeof(uint8_t)); @@ -630,10 +615,10 @@ DomainPart::read(FastOS_FileInterface &file, uint32_t len(0); his >> version >> len; if ((retval = (rlen == sizeof(tmp)))) { - if ( ! (retval = (version == Encoding::Crc::ccitt_crc32) || version == Encoding::Crc::xxh64)) { + if ( ! (retval = (version == ccitt_crc32) || version == xxh64)) { string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2'," - " got %d from '%s' at position %" PRId64, - version, file.GetFileName(), lastKnownGoodPos)); + " got %d from '%s' at position %" PRId64, + version, file.GetFileName(), lastKnownGoodPos)); if ((version == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) { LOG(warning, "%s", msg.c_str()); return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate); @@ -653,7 +638,7 @@ DomainPart::read(FastOS_FileInterface &file, entry.deserialize(is); int32_t crc(0); is >> crc; - int32_t crcVerify(calcCrc(Encoding(version).getCrc(), buf.get(), len - sizeof(crc))); + int32_t crcVerify(calcCrc(static_cast<Crc>(version), buf.get(), len - sizeof(crc))); if (crc != crcVerify) { throw runtime_error(make_string("Got bad crc for packet from '%s' (len pos=%" PRId64 ", len=%d) : crcVerify = %d, expected %d", file.GetFileName(), file.GetPosition() - len - sizeof(len), @@ -670,4 +655,17 @@ DomainPart::read(FastOS_FileInterface &file, return retval; } +int32_t DomainPart::calcCrc(Crc version, const void * buf, size_t sz) +{ + if (version == xxh64) { + return static_cast<int32_t>(XXH64(buf, sz, 0ll)); + } else if (version == ccitt_crc32) { + vespalib::crc_32_type calculator; + calculator.process_bytes(buf, sz); + return calculator.checksum(); + } else { + LOG_ABORT("should not be reached"); + } +} + } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h index f3a53c1e9a9..59d0df6df94 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h @@ -2,7 +2,6 @@ #pragma once #include "common.h" -#include "ichunk.h" #include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/memory.h> #include <map> @@ -20,9 +19,13 @@ private: DomainPart& operator=(const DomainPart &); public: + enum Crc { + ccitt_crc32=1, + xxh64=2 + }; typedef std::shared_ptr<DomainPart> SP; - DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Encoding defaultEncoding, - uint8_t compressionLevel, const common::FileHeaderContext &FileHeaderContext, bool allowTruncate); + DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Crc defaultCrc, + const common::FileHeaderContext &FileHeaderContext, bool allowTruncate); ~DomainPart(); @@ -52,6 +55,7 @@ private: static bool read(FastOS_FileInterface &file, Packet::Entry &entry, vespalib::alloc::Alloc &buf, bool allowTruncate); void write(FastOS_FileInterface &file, const Packet::Entry &entry); + static int32_t calcCrc(Crc crc, const void * buf, size_t len); void writeHeader(const common::FileHeaderContext &fileHeaderContext); class SkipInfo @@ -73,22 +77,21 @@ private: }; typedef std::vector<SkipInfo> SkipList; typedef std::map<SerialNum, Packet> PacketList; - const Encoding _encoding; - const uint8_t _compressionLevel; - vespalib::Lock _lock; - vespalib::Lock _fileLock; - SerialNumRange _range; - size_t _sz; + const Crc _defaultCrc; + vespalib::Lock _lock; + vespalib::Lock _fileLock; + SerialNumRange _range; + size_t _sz; std::atomic<uint64_t> _byteSize; - PacketList _packets; - vespalib::string _fileName; + PacketList _packets; + vespalib::string _fileName; std::unique_ptr<FastOS_FileInterface> _transLog; - SkipList _skipList; - uint32_t _headerLen; - vespalib::Lock _writeLock; + SkipList _skipList; + uint32_t _headerLen; + vespalib::Lock _writeLock; // Protected by _writeLock - SerialNum _writtenSerial; - SerialNum _syncedSerial; + SerialNum _writtenSerial; + SerialNum _syncedSerial; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h index 5e44815cb1b..4aa1b0a5a90 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h @@ -22,7 +22,7 @@ public: lz4 = 2, zstd = 3 }; - explicit Encoding(uint8_t raw) : _raw(raw) { } + Encoding(uint8_t raw) : _raw(raw) {} Encoding(Crc crc, Compression compression); Crc getCrc() const { return Crc(_raw & 0xf); } Compression getCompression() const { return Compression((_raw >> 4) & 0xf); } diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.h b/searchlib/src/vespa/searchlib/transactionlog/session.h index 9b8d23371e8..bf35d83c000 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.h +++ b/searchlib/src/vespa/searchlib/transactionlog/session.h @@ -25,7 +25,7 @@ private: public: class Destination { public: - virtual ~Destination() = default; + virtual ~Destination() {} virtual bool send(int32_t id, const vespalib::string & domain, const Packet & packet) = 0; virtual bool sendDone(int32_t id, const vespalib::string & domain) = 0; virtual bool connected() const = 0; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index b98453a7648..caef792704a 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -1,6 +1,5 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "translogserver.h" -#include <vespa/searchlib/common/gatecallback.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/time.h> @@ -79,25 +78,25 @@ SyncHandler::PerformTask() TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const FileHeaderContext &fileHeaderContext) - : TransLogServer(name, listenPort, baseDir, fileHeaderContext, - DomainConfig().setEncoding(Encoding(Encoding::xxh64, Encoding::Compression::none)) - .setPartSizeLimit(0x10000000).setChunkSizeLimit(0x40000).setChunkAgeLimit( 100us)) + : TransLogServer(name, listenPort, baseDir, fileHeaderContext, 0x10000000) {} TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg) - : TransLogServer(name, listenPort, baseDir, fileHeaderContext, cfg, 4) + const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize) + : TransLogServer(name, listenPort, baseDir, fileHeaderContext, domainPartSize, 4, DomainPart::Crc::xxh64) {} TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads) + const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize, + size_t maxThreads, DomainPart::Crc defaultCrcType) : FRT_Invokable(), _name(name), _baseDir(baseDir), - _domainConfig(cfg), + _domainPartSize(domainPartSize), + _defaultCrcType(defaultCrcType), _commitExecutor(maxThreads, 128*1024), _sessionExecutor(maxThreads, 128*1024), - _threadPool(std::make_unique<FastOS_ThreadPool>(1024*120)), + _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)), _transport(std::make_unique<FNET_Transport>()), _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())), _domains(), @@ -113,8 +112,8 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con domainDir >> domainName; if ( ! domainName.empty()) { try { - auto domain = make_shared<Domain>(domainName, dir(), *_threadPool, _commitExecutor, - _sessionExecutor, cfg, _fileHeaderContext); + auto domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor, + _domainPartSize, _defaultCrcType,_fileHeaderContext); _domains[domain->name()] = domain; } catch (const std::exception & e) { LOG(warning, "Failed creating %s domain on startup. Exception = %s", domainName.c_str(), e.what()); @@ -203,21 +202,10 @@ TransLogServer::run() req->Return(); } } - } while (running() && !(hasPacket && (req == nullptr))); + } while (running() && !(hasPacket && (req == NULL))); LOG(info, "TLS Stopped"); } - -TransLogServer & -TransLogServer::setDomainConfig(const DomainConfig & cfg) { - Guard domainGuard(_lock); - _domainConfig = cfg; - for(auto &domain: _domains) { - domain.second->setConfig(cfg); - } - return *this; -} - DomainStats TransLogServer::getDomainStats() const { @@ -446,8 +434,8 @@ TransLogServer::createDomain(FRT_RPCRequest *req) Domain::SP domain(findDomain(domainName)); if ( !domain ) { try { - domain = std::make_shared<Domain>(domainName, dir(), *_threadPool, _commitExecutor, - _sessionExecutor, _domainConfig, _fileHeaderContext); + domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor, + _domainPartSize, _defaultCrcType, _fileHeaderContext); Guard domainGuard(_lock); _domains[domain->name()] = domain; writeDomainDir(domainGuard, dir(), domainList(), _domains); @@ -556,9 +544,10 @@ TransLogServer::domainStatus(FRT_RPCRequest *req) void TransLogServer::commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) { + (void) done; Domain::SP domain(findDomain(domainName)); if (domain) { - domain->commit(packet, std::move(done)); + domain->commit(packet); } else { throw IllegalArgumentException("Could not find domain " + domainName); } @@ -575,9 +564,7 @@ TransLogServer::domainCommit(FRT_RPCRequest *req) if (domain) { Packet packet(params[1]._data._buf, params[1]._data._len); try { - vespalib::Gate gate; - domain->commit(packet, make_shared<GateCallback>(gate)); - gate.await(); + domain->commit(packet); ret.AddInt32(0); ret.AddString("ok"); } catch (const std::exception & e) { diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index 3f945977386..0d65f36e07d 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -26,15 +26,16 @@ public: typedef std::shared_ptr<TransLogServer> SP; TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads); + const common::FileHeaderContext &fileHeaderContext, + uint64_t domainPartSize, size_t maxThreads, DomainPart::Crc defaultCrc); TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg); + const common::FileHeaderContext &fileHeaderContext, uint64_t domainPartSize); TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const common::FileHeaderContext &fileHeaderContext); ~TransLogServer() override; DomainStats getDomainStats() const; + void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) override; - TransLogServer & setDomainConfig(const DomainConfig & cfg); class Session { @@ -81,7 +82,8 @@ private: vespalib::string _name; vespalib::string _baseDir; - DomainConfig _domainConfig; + const uint64_t _domainPartSize; + const DomainPart::Crc _defaultCrcType; vespalib::ThreadStackExecutor _commitExecutor; vespalib::ThreadStackExecutor _sessionExecutor; std::unique_ptr<FastOS_ThreadPool> _threadPool; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp index d83623661ff..b8d21fb7465 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp @@ -2,7 +2,6 @@ #include "translogserverapp.h" #include <vespa/config/subscription/configuri.h> -#include <vespa/vespalib/util/time.h> #include <vespa/log/log.h> LOG_SETUP(".translogserverapp"); @@ -25,59 +24,16 @@ TransLogServerApp::TransLogServerApp(const config::ConfigUri & tlsConfigUri, namespace { -Encoding::Crc +DomainPart::Crc getCrc(searchlib::TranslogserverConfig::Crcmethod crcType) { switch (crcType) { case searchlib::TranslogserverConfig::Crcmethod::ccitt_crc32: - return Encoding::Crc::ccitt_crc32; + return DomainPart::ccitt_crc32; case searchlib::TranslogserverConfig::Crcmethod::xxh64: - return Encoding::Crc::xxh64; + return DomainPart::xxh64; } - assert(false); -} - -Encoding::Compression -getCompression(searchlib::TranslogserverConfig::Compression::Type type) -{ - switch (type) { - case searchlib::TranslogserverConfig::Compression::Type::NONE: - return Encoding::Compression::none; - case searchlib::TranslogserverConfig::Compression::Type::NONE_MULTI: - return Encoding::Compression::none_multi; - case searchlib::TranslogserverConfig::Compression::Type::LZ4: - return Encoding::Compression::lz4; - case searchlib::TranslogserverConfig::Compression::Type::ZSTD: - return Encoding::Compression::zstd; - } - assert(false); -} - -Encoding -getEncoding(const searchlib::TranslogserverConfig & cfg) -{ - return Encoding(getCrc(cfg.crcmethod), getCompression(cfg.compression.type)); -} - -DomainConfig -getDomainConfig(const searchlib::TranslogserverConfig & cfg) { - DomainConfig dcfg; - dcfg.setEncoding(getEncoding(cfg)) - .setCompressionLevel(cfg.compression.level) - .setPartSizeLimit(cfg.filesizemax) - .setChunkSizeLimit(cfg.chunk.sizelimit) - .setChunkAgeLimit(vespalib::from_s(cfg.chunk.agelimit)); - return dcfg; -} - -void -logReconfig(const searchlib::TranslogserverConfig & cfg, const DomainConfig & dcfg) { - LOG(config, "configure Transaction Log Server %s at port %d\n" - "DomainConfig {encoding={%d, %d}, compression_level=%d, part_limit=%ld, chunk_limit=%ld age=%1.4f}", - cfg.servername.c_str(), cfg.listenport, - dcfg.getEncoding().getCrc(), dcfg.getEncoding().getCompression(), dcfg.getCompressionlevel(), - dcfg.getPartSizeLimit(), dcfg.getChunkSizeLimit(), vespalib::to_s(dcfg.getChunkAgeLimit()) - ); + LOG_ABORT("should not be reached"); } } @@ -85,12 +41,11 @@ logReconfig(const searchlib::TranslogserverConfig & cfg, const DomainConfig & dc void TransLogServerApp::start() { + std::shared_ptr<searchlib::TranslogserverConfig> c = _tlsConfig.get(); + auto tls = std::make_shared<TransLogServer>(c->servername, c->listenport, c->basedir, _fileHeaderContext, + c->filesizemax, c->maxthreads, getCrc(c->crcmethod)); std::lock_guard<std::mutex> guard(_lock); - auto c = _tlsConfig.get(); - DomainConfig domainConfig = getDomainConfig(*c); - logReconfig(*c, domainConfig); - _tls = std::make_shared<TransLogServer>(c->servername, c->listenport, c->basedir, _fileHeaderContext, - domainConfig, c->maxthreads); + _tls = std::move(tls); } TransLogServerApp::~TransLogServerApp() @@ -101,15 +56,9 @@ TransLogServerApp::~TransLogServerApp() void TransLogServerApp::configure(std::unique_ptr<searchlib::TranslogserverConfig> cfg) { - - std::lock_guard<std::mutex> guard(_lock); - DomainConfig dcfg = getDomainConfig(*cfg); - logReconfig(*cfg, dcfg); + LOG(config, "configure Transaction Log Server %s at port %d", cfg->servername.c_str(), cfg->listenport); _tlsConfig.set(cfg.release()); _tlsConfig.latch(); - if (_tls) { - _tls->setDomainConfig(dcfg); - } } TransLogServer::SP |