diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-01-14 17:52:55 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-01-14 17:52:55 +0100 |
commit | 63966afece7d250b4ba4c03ad1f43ee7457b1dec (patch) | |
tree | fbdc3cdf92e3ed3bb2c0b657586d9885010323a7 /searchlib/src | |
parent | f909e2e5f34357af78e28dd4d948134a0fee50aa (diff) |
Revert "Revert "Revert "Balder/group multiple commits rebased 1"""
Diffstat (limited to 'searchlib/src')
19 files changed, 621 insertions, 1180 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index f88b798370c..861023b79b7 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -5,6 +5,7 @@ #include <vespa/vespalib/objects/identifiable.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/fastos/file.h> +#include <map> #include <vespa/log/log.h> LOG_SETUP("translogclient_test"); @@ -13,24 +14,9 @@ using namespace search; using namespace transactionlog; using namespace document; using namespace vespalib; -using namespace std::chrono_literals; using search::index::DummyFileHeaderContext; -namespace { - -bool createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains=0); -TransLogClient::Session::UP openDomainTest(TransLogClient & tls, const vespalib::string & name); -bool fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name); -void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries); -void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize); -uint32_t countFiles(const vespalib::string &dir); -void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries); -bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name); -void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains); -void verifyDomain(const vespalib::string & name); - -vespalib::string -myhex(const void * b, size_t sz) +vespalib::string myhex(const void * b, size_t sz) { static const char * hextab="0123456789ABCDEF"; const unsigned char * c = static_cast<const unsigned char *>(b); @@ -43,6 +29,35 @@ myhex(const void * b, size_t sz) return s; } +class Test : public vespalib::TestApp +{ +public: + int Main() override; +private: + bool createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains=0); + TransLogClient::Session::UP openDomainTest(TransLogClient & tls, const vespalib::string & name); + bool fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name); + void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries); + void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize); + uint32_t countFiles(const vespalib::string &dir); + void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries); + bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name); + bool partialUpdateTest(); + bool testVisitOverGeneratedDomain(); + bool testRemove(); + void createAndFillDomain(const vespalib::string & name, DomainPart::Crc crcMethod, size_t preExistingDomains); + void verifyDomain(const vespalib::string & name); + void testCrcVersions(); + bool testVisitOverPreExistingDomain(); + void testMany(); + void testErase(); + void testSync(); + void testTruncateOnShortRead(); + void testTruncateOnVersionMismatch(); +}; + +TEST_APPHOOK(Test); + class CallBackTest : public TransLogClient::Visitor::Callback { private: @@ -60,8 +75,7 @@ public: bool _eof; }; -RPC::Result -CallBackTest::receive(const Packet & p) +RPC::Result CallBackTest::receive(const Packet & p) { nbostream_longlivedbuf h(p.getHandle().c_str(), p.getHandle().size()); LOG(info,"CallBackTest::receive (%zu, %zu, %zu)(%s)", h.rp(), h.size(), h.capacity(), myhex(h.peek(), h.size()).c_str()); @@ -87,8 +101,7 @@ public: size_t _value; }; -RPC::Result -CallBackManyTest::receive(const Packet & p) +RPC::Result CallBackManyTest::receive(const Packet & p) { nbostream_longlivedbuf h(p.getHandle().c_str(), p.getHandle().size()); for(;h.size() > 0; _count++, _value++) { @@ -120,8 +133,7 @@ public: }; -RPC::Result -CallBackUpdate::receive(const Packet & packet) +RPC::Result CallBackUpdate::receive(const Packet & packet) { nbostream_longlivedbuf h(packet.getHandle().c_str(), packet.getHandle().size()); while (h.size() > 0) { @@ -173,8 +185,7 @@ public: SerialNum _prevSerial; }; -RPC::Result -CallBackStatsTest::receive(const Packet & p) +RPC::Result CallBackStatsTest::receive(const Packet & p) { nbostream_longlivedbuf h(p.getHandle().c_str(), p.getHandle().size()); for(;h.size() > 0; ++_count) { @@ -210,10 +221,67 @@ public: IMPLEMENT_IDENTIFIABLE(TestIdentifiable, Identifiable); -constexpr size_t DEFAULT_PACKET_SIZE = 0xf000; +bool Test::partialUpdateTest() +{ + bool retval(false); + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); + TransLogClient tls("tcp/localhost:18377"); + + TransLogClient::Session::UP s1 = openDomainTest(tls, "test1"); + TransLogClient::Session & session = *s1; + + TestIdentifiable du; + + nbostream os; + os << du; + + vespalib::ConstBufferRef bb(os.c_str(), os.size()); + LOG(info, "DU : %s", myhex(bb.c_str(), bb.size()).c_str()); + Packet::Entry e(7, du.getClass().id(), bb); + Packet pa; + pa.add(e); + pa.close(); + ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().c_str(), pa.getHandle().size()))); + + CallBackUpdate ca; + TransLogClient::Visitor::UP visitor = tls.createVisitor("test1", ca); + ASSERT_TRUE(visitor.get()); + ASSERT_TRUE( visitor->visit(5, 7) ); + for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); } + ASSERT_TRUE( ca._eof ); + ASSERT_TRUE( ca.map().size() == 1); + ASSERT_TRUE( ca.hasSerial(7) ); + + CallBackUpdate ca1; + TransLogClient::Visitor::UP visitor1 = tls.createVisitor("test1", ca1); + ASSERT_TRUE(visitor1.get()); + ASSERT_TRUE( visitor1->visit(4, 5) ); + for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); } + ASSERT_TRUE( ca1._eof ); + ASSERT_TRUE( ca1.map().size() == 0); + + CallBackUpdate ca2; + TransLogClient::Visitor::UP visitor2 = tls.createVisitor("test1", ca2); + ASSERT_TRUE(visitor2.get()); + ASSERT_TRUE( visitor2->visit(5, 6) ); + for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); } + ASSERT_TRUE( ca2._eof ); + ASSERT_TRUE( ca2.map().size() == 0); + + CallBackUpdate ca3; + TransLogClient::Visitor::UP visitor3 = tls.createVisitor("test1", ca3); + ASSERT_TRUE(visitor3.get()); + ASSERT_TRUE( visitor3->visit(5, 1000) ); + for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); } + ASSERT_TRUE( ca3._eof ); + ASSERT_TRUE( ca3.map().size() == 1); + ASSERT_TRUE( ca3.hasSerial(7) ); -bool -createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains) + return retval; +} + +bool Test::createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains) { bool retval(true); std::vector<vespalib::string> dir; @@ -230,40 +298,43 @@ createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t pre return retval; } -TransLogClient::Session::UP -openDomainTest(TransLogClient & tls, const vespalib::string & name) +TransLogClient::Session::UP Test::openDomainTest(TransLogClient & tls, const vespalib::string & name) { TransLogClient::Session::UP s1 = tls.open(name); ASSERT_TRUE (s1.get() != NULL); return s1; } -bool -fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name) +bool Test::fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name) { bool retval(true); Packet::Entry e1(1, 1, vespalib::ConstBufferRef("Content in buffer A", 20)); Packet::Entry e2(2, 2, vespalib::ConstBufferRef("Content in buffer B", 20)); Packet::Entry e3(3, 1, vespalib::ConstBufferRef("Content in buffer C", 20)); - Packet a(DEFAULT_PACKET_SIZE); - a.add(e1); - Packet b(DEFAULT_PACKET_SIZE); - b.add(e2); - b.add(e3); - EXPECT_EXCEPTION(b.add(e1), std::runtime_error, ""); + Packet a; + ASSERT_TRUE (a.add(e1)); + Packet b; + ASSERT_TRUE (b.add(e2)); + ASSERT_TRUE (b.add(e3)); + ASSERT_TRUE (!b.add(e1)); + a.close(); + b.close(); ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(a.getHandle().c_str(), a.getHandle().size()))); ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(b.getHandle().c_str(), b.getHandle().size()))); - EXPECT_EXCEPTION(s1->commit(vespalib::ConstBufferRef(a.getHandle().c_str(), a.getHandle().size())), - std::runtime_error, - "commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3)."); + try { + s1->commit(vespalib::ConstBufferRef(a.getHandle().c_str(), a.getHandle().size())); + ASSERT_TRUE(false); + } catch (const std::exception & e) { + EXPECT_EQUAL(vespalib::string("commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3)."), e.what()); + } EXPECT_EQUAL(a.size(), 1u); EXPECT_EQUAL(a.range().from(), 1u); EXPECT_EQUAL(a.range().to(), 1u); EXPECT_EQUAL(b.size(), 2u); EXPECT_EQUAL(b.range().from(), 2u); EXPECT_EQUAL(b.range().to(), 3u); - a.merge(b); + EXPECT_TRUE(a.merge(b)); EXPECT_EQUAL(a.size(), 3u); EXPECT_EQUAL(a.range().from(), 1u); EXPECT_EQUAL(a.range().to(), 3u); @@ -278,80 +349,52 @@ fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name) return retval; } -void -fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries) +void Test::fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries) { size_t value(0); for(size_t i=0; i < numPackets; i++) { - std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE)); + std::unique_ptr<Packet> p(new Packet()); for(size_t j=0; j < numEntries; j++, value++) { Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value))); - p->add(e); - if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) { + if ( ! p->add(e) ) { + p->close(); ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size()))); - p.reset(new Packet(DEFAULT_PACKET_SIZE)); + p.reset(new Packet()); + ASSERT_TRUE(p->add(e)); } } + p->close(); ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size()))); } } -using Counter = std::atomic<size_t>; - -class CountDone : public IDestructorCallback { -public: - CountDone(Counter & inFlight) : _inFlight(inFlight) { ++_inFlight; } - ~CountDone() override { --_inFlight; } -private: - Counter & _inFlight; -}; - -void -fillDomainTest(TransLogServer & s1, const vespalib::string & domain, size_t numPackets, size_t numEntries) -{ - size_t value(0); - Counter inFlight(0); - for(size_t i=0; i < numPackets; i++) { - std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE)); - for(size_t j=0; j < numEntries; j++, value++) { - Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value))); - p->add(e); - if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) { - s1.commit(domain, *p, std::make_shared<CountDone>(inFlight)); - p.reset(new Packet(DEFAULT_PACKET_SIZE)); - } - } - s1.commit(domain, *p, std::make_shared<CountDone>(inFlight)); - LOG(info, "Inflight %ld", inFlight.load()); - } - while (inFlight.load() != 0) { - std::this_thread::sleep_for(10ms); - LOG(info, "Waiting for inflight %ld to reach zero", inFlight.load()); - } - -} void -fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize) +Test::fillDomainTest(TransLogClient::Session * s1, + size_t numPackets, size_t numEntries, + size_t entrySize) { size_t value(0); std::vector<char> entryBuffer(entrySize); for(size_t i=0; i < numPackets; i++) { - std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE)); + std::unique_ptr<Packet> p(new Packet()); for(size_t j=0; j < numEntries; j++, value++) { Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&entryBuffer[0], entryBuffer.size())); - p->add(e); - if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) { + if ( ! p->add(e) ) { + p->close(); ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size()))); - p.reset(new Packet(DEFAULT_PACKET_SIZE)); + p.reset(new Packet()); + ASSERT_TRUE(p->add(e)); } } + p->close(); ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size()))); } } + uint32_t -countFiles(const vespalib::string &dir) +Test::countFiles(const vespalib::string &dir) { uint32_t res = 0; FastOS_DirectoryScan dirScan(dir.c_str()); @@ -365,8 +408,10 @@ countFiles(const vespalib::string &dir) return res; } + void -checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries) +Test::checkFilledDomainTest(const TransLogClient::Session::UP &s1, + size_t numEntries) { SerialNum b(0), e(0); size_t c(0); @@ -376,8 +421,8 @@ checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries) EXPECT_EQUAL(c, numEntries); } -bool -visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name) + +bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name) { bool retval(true); @@ -392,7 +437,7 @@ visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespal TransLogClient::Visitor::UP visitor = tls.createVisitor(name, ca); ASSERT_TRUE(visitor.get()); EXPECT_TRUE( visitor->visit(0, 1) ); - for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } + for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } EXPECT_TRUE( ca._eof ); EXPECT_TRUE( ! ca.hasSerial(0) ); EXPECT_TRUE( ca.hasSerial(1) ); @@ -402,7 +447,7 @@ visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespal visitor = tls.createVisitor(name, ca); ASSERT_TRUE(visitor.get()); EXPECT_TRUE( visitor->visit(1, 2) ); - for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } + for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } EXPECT_TRUE( ca._eof ); EXPECT_TRUE( ! ca.hasSerial(0) ); EXPECT_TRUE( ! ca.hasSerial(1) ); @@ -413,7 +458,7 @@ visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespal visitor = tls.createVisitor(name, ca); EXPECT_TRUE(visitor.get()); EXPECT_TRUE( visitor->visit(0, 3) ); - for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } + for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } EXPECT_TRUE( ca._eof ); EXPECT_TRUE( ! ca.hasSerial(0) ); EXPECT_TRUE( ca.hasSerial(1) ); @@ -424,7 +469,7 @@ visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespal visitor = tls.createVisitor(name, ca); ASSERT_TRUE(visitor.get()); EXPECT_TRUE( visitor->visit(2, 3) ); - for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } + for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } EXPECT_TRUE( ca._eof ); EXPECT_TRUE( ! ca.hasSerial(0) ); EXPECT_TRUE( !ca.hasSerial(1) ); @@ -441,31 +486,10 @@ getMaxSessionRunTime(TransLogServer &tls, const vespalib::string &domain) return tls.getDomainStats()[domain].maxSessionRunTime.count(); } -void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains) +bool Test::testVisitOverGeneratedDomain() { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test13", 18377, ".", fileHeaderContext, - DomainConfig().setPartSizeLimit(0x1000000).setEncoding(encoding), 4); - TransLogClient tls("tcp/localhost:18377"); - - createDomainTest(tls, name, preExistingDomains); - TransLogClient::Session::UP s1 = openDomainTest(tls, name); - fillDomainTest(s1.get(), name); -} - -void verifyDomain(const vespalib::string & name) { - DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test13", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); - TransLogClient tls("tcp/localhost:18377"); - TransLogClient::Session::UP s1 = openDomainTest(tls, name); - visitDomainTest(tls, s1.get(), name); -} - -} - -TEST("testVisitOverGeneratedDomain") { - DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); TransLogClient tls("tcp/localhost:18377"); vespalib::string name("test1"); @@ -477,85 +501,42 @@ TEST("testVisitOverGeneratedDomain") { double maxSessionRunTime = getMaxSessionRunTime(tlss, "test1"); LOG(info, "testVisitOverGeneratedDomain(): maxSessionRunTime=%f", maxSessionRunTime); EXPECT_GREATER(maxSessionRunTime, 0); + return true; } -TEST("testVisitOverPreExistingDomain") { - // Depends on Test::testVisitOverGeneratedDomain() +void Test::createAndFillDomain(const vespalib::string & name, DomainPart::Crc crcMethod, size_t preExistingDomains) +{ DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); + TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000, 4, crcMethod); TransLogClient tls("tcp/localhost:18377"); - vespalib::string name("test1"); + createDomainTest(tls, name, preExistingDomains); TransLogClient::Session::UP s1 = openDomainTest(tls, name); - visitDomainTest(tls, s1.get(), name); + fillDomainTest(s1.get(), name); } -TEST("partialUpdateTest") { +void Test::verifyDomain(const vespalib::string & name) +{ DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); + TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000); TransLogClient tls("tcp/localhost:18377"); - - TransLogClient::Session::UP s1 = openDomainTest(tls, "test1"); - TransLogClient::Session & session = *s1; - - TestIdentifiable du; - - nbostream os; - os << du; - - vespalib::ConstBufferRef bb(os.c_str(), os.size()); - LOG(info, "DU : %s", myhex(bb.c_str(), bb.size()).c_str()); - Packet::Entry e(7, du.getClass().id(), bb); - Packet pa(DEFAULT_PACKET_SIZE); - pa.add(e); - ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().c_str(), pa.getHandle().size()))); - - CallBackUpdate ca; - TransLogClient::Visitor::UP visitor = tls.createVisitor("test1", ca); - ASSERT_TRUE(visitor.get()); - ASSERT_TRUE( visitor->visit(5, 7) ); - for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca._eof ); - ASSERT_TRUE( ca.map().size() == 1); - ASSERT_TRUE( ca.hasSerial(7) ); - - CallBackUpdate ca1; - TransLogClient::Visitor::UP visitor1 = tls.createVisitor("test1", ca1); - ASSERT_TRUE(visitor1.get()); - ASSERT_TRUE( visitor1->visit(4, 5) ); - for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca1._eof ); - ASSERT_TRUE( ca1.map().size() == 0); - - CallBackUpdate ca2; - TransLogClient::Visitor::UP visitor2 = tls.createVisitor("test1", ca2); - ASSERT_TRUE(visitor2.get()); - ASSERT_TRUE( visitor2->visit(5, 6) ); - for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca2._eof ); - ASSERT_TRUE( ca2.map().size() == 0); - - CallBackUpdate ca3; - TransLogClient::Visitor::UP visitor3 = tls.createVisitor("test1", ca3); - ASSERT_TRUE(visitor3.get()); - ASSERT_TRUE( visitor3->visit(5, 1000) ); - for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca3._eof ); - ASSERT_TRUE( ca3.map().size() == 1); - ASSERT_TRUE( ca3.hasSerial(7) ); + TransLogClient::Session::UP s1 = openDomainTest(tls, name); + visitDomainTest(tls, s1.get(), name); } -TEST("testCrcVersions") { - createAndFillDomain("ccitt_crc32", Encoding::Crc::ccitt_crc32, 0); - createAndFillDomain("xxh64", Encoding::Crc::xxh64, 1); +void Test::testCrcVersions() +{ + createAndFillDomain("ccitt_crc32", DomainPart::ccitt_crc32, 0); + createAndFillDomain("xxh64", DomainPart::xxh64, 1); verifyDomain("ccitt_crc32"); verifyDomain("xxh64"); } -TEST("testRemove") { +bool Test::testRemove() +{ DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); + TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, 0x10000); TransLogClient tls("tcp/localhost:18377"); vespalib::string name("test-delete"); @@ -564,6 +545,21 @@ TEST("testRemove") { fillDomainTest(s1.get(), name); visitDomainTest(tls, s1.get(), name); ASSERT_TRUE(tls.remove(name)); + + return true; +} + +bool Test::testVisitOverPreExistingDomain() +{ + // Depends on Test::testVisitOverGeneratedDomain() + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); + TransLogClient tls("tcp/localhost:18377"); + + vespalib::string name("test1"); + TransLogClient::Session::UP s1 = openDomainTest(tls, name); + visitDomainTest(tls, s1.get(), name); + return true; } namespace { @@ -579,7 +575,7 @@ assertVisitStats(TransLogClient &tls, const vespalib::string &domain, ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(visitStart, visitEnd) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { - std::this_thread::sleep_for(10ms); + FastOS_Thread::Sleep(10); } ASSERT_TRUE(ca._eof); EXPECT_EQUAL(expFirstSerial, ca._firstSerial); @@ -604,19 +600,18 @@ assertStatus(TransLogClient::Session &s, } -TEST("test sending a lot of data") { +void Test::testMany() +{ const unsigned int NUM_PACKETS = 1000; const unsigned int NUM_ENTRIES = 100; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; - const vespalib::string MANY("many"); { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000) - .setChunkAgeLimit(100us)); + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x80000); TransLogClient tls("tcp/localhost:18377"); - createDomainTest(tls, MANY, 0); - TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); + createDomainTest(tls, "many", 0); + TransLogClient::Session::UP s1 = openDomainTest(tls, "many"); fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES); SerialNum b(0), e(0); size_t c(0); @@ -625,52 +620,20 @@ TEST("test sending a lot of data") { EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); - ASSERT_TRUE(visitor.get()); - ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); - for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca._eof ); - EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES); - EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES); - } - { - DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); - TransLogClient tls("tcp/localhost:18377"); - - TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); - SerialNum b(0), e(0); - size_t c(0); - EXPECT_TRUE(s1->status(b, e, c)); - EXPECT_EQUAL(b, 1u); - EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); - EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); - CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); + TransLogClient::Visitor::UP visitor = tls.createVisitor("many", ca); ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); - for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } + for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } ASSERT_TRUE( ca._eof ); EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES); } -} - -TEST("test sending a lot of data async") { - const unsigned int NUM_PACKETS = 1000; - const unsigned int NUM_ENTRIES = 100; - const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; - const vespalib::string MANY("many-async"); - { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000) - .setChunkAgeLimit(10ms)); + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x1000000); TransLogClient tls("tcp/localhost:18377"); - createDomainTest(tls, MANY, 1); - TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); - fillDomainTest(tlss, MANY, NUM_PACKETS, NUM_ENTRIES); + TransLogClient::Session::UP s1 = openDomainTest(tls, "many"); SerialNum b(0), e(0); size_t c(0); EXPECT_TRUE(s1->status(b, e, c)); @@ -678,44 +641,24 @@ TEST("test sending a lot of data async") { EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); + TransLogClient::Visitor::UP visitor = tls.createVisitor("many", ca); ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); - for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca._eof ); - EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES); - EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES); - } - { - DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); - TransLogClient tls("tcp/localhost:18377"); - - TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); - SerialNum b(0), e(0); - size_t c(0); - EXPECT_TRUE(s1->status(b, e, c)); - EXPECT_EQUAL(b, 1u); - EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); - EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); - CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); - ASSERT_TRUE(visitor.get()); - ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); - for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } + for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } ASSERT_TRUE( ca._eof ); EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES); } } -TEST("testErase") { +void Test::testErase() +{ const unsigned int NUM_PACKETS = 1000; const unsigned int NUM_ENTRIES = 100; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000)); + TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x80000); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "erase", 0); @@ -724,7 +667,7 @@ TEST("testErase") { } { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); + TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x1000000); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, "erase"); @@ -805,13 +748,16 @@ TEST("testErase") { } } -TEST("testSync") { + +void +Test::testSync() +{ const unsigned int NUM_PACKETS = 3; const unsigned int NUM_ENTRIES = 4; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test9", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); + TransLogServer tlss("test9", 18377, ".", fileHeaderContext, 0x1000000); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "sync", 0); @@ -824,7 +770,10 @@ TEST("testSync") { EXPECT_EQUAL(syncedTo, TOTAL_NUM_ENTRIES); } -TEST("test truncate on version mismatch") { + +void +Test::testTruncateOnVersionMismatch() +{ const unsigned int NUM_PACKETS = 3; const unsigned int NUM_ENTRIES = 4; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; @@ -833,7 +782,7 @@ TEST("test truncate on version mismatch") { size_t countOld(0); DummyFileHeaderContext fileHeaderContext; { - TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); + TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "sync", 0); @@ -854,7 +803,7 @@ TEST("test truncate on version mismatch") { EXPECT_EQUAL(static_cast<ssize_t>(sizeof(tmp)), f.Write2(tmp, sizeof(tmp))); EXPECT_TRUE(f.Close()); { - TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); + TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, "sync"); uint64_t from(0), to(0); @@ -866,7 +815,9 @@ TEST("test truncate on version mismatch") { } } -TEST("test trucation after short read") { +void +Test::testTruncateOnShortRead() +{ const unsigned int NUM_PACKETS = 17; const unsigned int NUM_ENTRIES = 1; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; @@ -878,7 +829,7 @@ TEST("test trucation after short read") { DummyFileHeaderContext fileHeaderContext; { - TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); TransLogClient tls(tlsspec); createDomainTest(tls, domain, 0); @@ -894,7 +845,7 @@ TEST("test trucation after short read") { EXPECT_EQUAL(2u, countFiles(dir)); } { - TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); TransLogClient tls(tlsspec); TransLogClient::Session::UP s1 = openDomainTest(tls, domain); checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES); @@ -910,7 +861,7 @@ TEST("test trucation after short read") { trfile.Close(); } { - TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); TransLogClient tls(tlsspec); TransLogClient::Session::UP s1 = openDomainTest(tls, domain); checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES - 1); @@ -920,4 +871,28 @@ TEST("test trucation after short read") { } } -TEST_MAIN() { TEST_RUN_ALL(); } + +int Test::Main() +{ + TEST_INIT("translogclient_test"); + + if (_argc > 0) { + DummyFileHeaderContext::setCreator(_argv[0]); + } + testVisitOverGeneratedDomain(); + testVisitOverPreExistingDomain(); + testMany(); + testErase(); + partialUpdateTest(); + + testRemove(); + + testSync(); + + testTruncateOnShortRead(); + testTruncateOnVersionMismatch(); + + testCrcVersions(); + + TEST_DONE(); +} diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp index 6f2581d3799..abba84b75b6 100644 --- a/searchlib/src/tests/transactionlogstress/translogstress.cpp +++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp @@ -8,6 +8,7 @@ #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/fastos/app.h> #include <iostream> +#include <stdexcept> #include <sstream> #include <vespa/log/log.h> @@ -220,6 +221,7 @@ FeederThread::~FeederThread() {} void FeederThread::commitPacket() { + _packet.close(); const vespalib::nbostream& stream = _packet.getHandle(); if (!_session->commit(ConstBufferRef(stream.c_str(), stream.size()))) { throw std::runtime_error(vespalib::make_string @@ -234,9 +236,8 @@ FeederThread::commitPacket() bool FeederThread::addEntry(const Packet::Entry & e) { - if (_packet.sizeBytes() > 0xf000) return false; - _packet.add(e); - return true; + //LOG(info, "FeederThread: add %s", EntryPrinter::toStr(e).c_str()); + return _packet.add(e); } void @@ -698,7 +699,7 @@ TransLogStress::Main() // start transaction log server DummyFileHeaderContext fileHeaderContext; - TransLogServer tls("server", 17897, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(_cfg.domainPartSize)); + TransLogServer tls("server", 17897, ".", fileHeaderContext, _cfg.domainPartSize); TransLogClient client(tlsSpec); client.create(domain); diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def index 81ab5f6e4fa..74efe3fe68e 100644 --- a/searchlib/src/vespa/searchlib/config/translogserver.def +++ b/searchlib/src/vespa/searchlib/config/translogserver.def @@ -5,7 +5,7 @@ namespace=searchlib listenport int default=13700 restart ## Max file size (50M) -filesizemax int default=50000000 +filesizemax int default=50000000 restart ## Server name to identify server. servername string default="tls" restart @@ -22,17 +22,3 @@ maxthreads int default=4 restart ##Default crc method used crcmethod enum {ccitt_crc32, xxh64} default=xxh64 - -## Control compression type. -compression.type enum {NONE, LZ4, ZSTD} default=LZ4 - -## Control compression level -## LZ4 has normal range 1..9 while ZSTD has range 1..19 -## 9 is a reasonable default for both -compression.level int default=9 - -## How large a chunk can grow in memory before beeing flushed -chunk.sizelimit int default = 256000 # 256k - -## How long a chunk can reside in memory befor ebeeing flushed to disk. -chunk.agelimit double default = 0.010 # 10 milliseconds diff --git a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt index 0755d07b403..d964c88fe29 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt @@ -1,11 +1,9 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(searchlib_transactionlog OBJECT SOURCES - chunks.cpp common.cpp domain.cpp domainpart.cpp - ichunk.cpp nosyncproxy.cpp session.cpp trans_log_server_explorer.cpp diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp deleted file mode 100644 index dea0ccf9b9a..00000000000 --- a/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "chunks.h" -#include <vespa/vespalib/util/stringfmt.h> -#include <vespa/vespalib/util/compressor.h> -#include <vespa/vespalib/data/databuffer.h> - -using std::runtime_error; -using std::make_unique; -using vespalib::make_string; -using vespalib::compression::compress; -using vespalib::compression::decompress; -using vespalib::compression::CompressionConfig; -using vespalib::DataBuffer; -using vespalib::ConstBufferRef; -using vespalib::nbostream; - -namespace search::transactionlog { - -namespace { -void verifyCrc(nbostream & is, Encoding::Crc crcType) { - if (is.size() < sizeof(int32_t) * 2) { - throw runtime_error(make_string("Not even room for the crc and length. Only %zu bytes left", is.size())); - } - size_t start = is.rp(); - is.adjustReadPos(is.size() - sizeof(int32_t)); - int32_t crc(0); - is >> crc; - is.rp(start); - int32_t crcVerify = Encoding::calcCrc(crcType, is.c_str(), is.size() - sizeof(crc)); - if (crc != crcVerify) { - throw runtime_error(make_string("Got bad crc : crcVerify = %d, expected %d", - static_cast<int>(crcVerify), static_cast<int>(crc))); - } - is.rp(start); -} - -Encoding::Compression -toCompression(CompressionConfig::Type type) { - switch (type) { - case CompressionConfig::ZSTD: - return Encoding::Compression::zstd; - case CompressionConfig::LZ4: - return Encoding::Compression::lz4; - case CompressionConfig::NONE: - return Encoding::Compression::none; - default: - abort(); - } -} - -} - -Encoding -CCITTCRC32None::onEncode(nbostream &os) const { - size_t start = os.wp(); - assert(getEntries().size() == 1); - serializeEntries(os); - os << int32_t(Encoding::calcCrc(Encoding::Crc::ccitt_crc32, os.c_str()+start, os.size() - start)); - return Encoding(Encoding::Crc::ccitt_crc32, Encoding::Compression::none); -} - -void CCITTCRC32None::onDecode(nbostream &is) { - verifyCrc(is, Encoding::Crc::ccitt_crc32); - nbostream data(is.peek(), is.size() - sizeof(int32_t)); - deserializeEntries(data); - is.adjustReadPos(is.size()); -} - -Encoding -XXH64None::onEncode(nbostream &os) const { - size_t start = os.wp(); - assert(getEntries().size() == 1); - serializeEntries(os); - os << int32_t(Encoding::calcCrc(Encoding::Crc::xxh64, os.c_str()+start, os.size() - start)); - return Encoding(Encoding::Crc::xxh64, Encoding::Compression::none); -} - -void XXH64None::onDecode(nbostream &is) { - verifyCrc(is, Encoding::Crc::xxh64); - nbostream data(is.peek(), is.size() - sizeof(int32_t)); - deserializeEntries(data); - is.adjustReadPos(is.size()); -} - -void -XXH64Compressed::decompress(nbostream & is) { - uint32_t uncompressedLen; - is >> uncompressedLen; - vespalib::DataBuffer uncompressed; - ConstBufferRef compressed(is.peek(), is.size()-sizeof(uint32_t)*2); - ::decompress(_type, uncompressedLen, compressed, uncompressed, false); - nbostream data(uncompressed.getData(), uncompressed.getDataLen()); - deserializeEntries(data); - is.adjustReadPos(is.size()); -} - -XXH64Compressed::XXH64Compressed(CompressionConfig::Type type, uint8_t level) - : _type(type), - _level(level) -{ } - -Encoding -XXH64Compressed::compress(nbostream & os, Encoding::Crc crc) const { - nbostream org; - serializeEntries(org); - DataBuffer compressed; - CompressionConfig cfg(_type, _level, 80); - ConstBufferRef uncompressed(org.c_str(), org.size()); - Encoding::Compression actual = toCompression(::compress(cfg, uncompressed, compressed, false)); - size_t start = os.wp(); - os.write(compressed.getData(), compressed.getDataLen()); - os << int32_t(Encoding::calcCrc(crc, os.c_str()+start, os.size() - start)); - return Encoding(Encoding::Crc::xxh64, actual); -} - -Encoding -XXH64Compressed::onEncode(IChunk::nbostream &os) const { - return compress(os, Encoding::Crc::xxh64); -} - -void -XXH64Compressed::onDecode(IChunk::nbostream &is) { - verifyCrc(is, Encoding::Crc::xxh64); - decompress(is); -} - -} diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.h b/searchlib/src/vespa/searchlib/transactionlog/chunks.h deleted file mode 100644 index cf88bc0a3ed..00000000000 --- a/searchlib/src/vespa/searchlib/transactionlog/chunks.h +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "ichunk.h" -#include <vespa/vespalib/util/compressionconfig.h> - -namespace search::transactionlog { - -class XXH64None : public IChunk { -protected: - Encoding onEncode(nbostream &os) const override; - void onDecode(nbostream &is) override; -public: -}; - -class CCITTCRC32None : public IChunk { -protected: - Encoding onEncode(nbostream &os) const override; - void onDecode(nbostream &is) override; -public: -}; - -class XXH64Compressed : public IChunk { -public: - using CompressionConfig = vespalib::compression::CompressionConfig; - XXH64Compressed(CompressionConfig::Type, uint8_t level); -protected: - void decompress(nbostream & os); - Encoding compress(nbostream & os, Encoding::Crc crc) const; - Encoding onEncode(nbostream &os) const override; - void onDecode(nbostream &is) override; -private: - CompressionConfig::Type _type; - uint8_t _level; -}; - -} diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.cpp b/searchlib/src/vespa/searchlib/transactionlog/common.cpp index 9bf43c8e244..a84e27b2e53 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/common.cpp @@ -1,29 +1,14 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "common.h" -#include <vespa/vespalib/util/stringfmt.h> #include <vespa/fastos/file.h> namespace search::transactionlog { using vespalib::nbostream; using vespalib::nbostream_longlivedbuf; -using vespalib::make_string; -using std::runtime_error; -namespace { - -void throwRangeError(SerialNum prev, SerialNum next) __attribute__((noinline)); - -void throwRangeError(SerialNum prev, SerialNum next) { - if (prev < next) return; - throw runtime_error(make_string("The new serialnum %zu is not higher than the old one %zu", next, prev)); -} - -} - -int -makeDirectory(const char * dir) +int makeDirectory(const char * dir) { int retval(-1); @@ -37,8 +22,7 @@ makeDirectory(const char * dir) return retval; } -int64_t -SerialNumRange::cmp(const SerialNumRange & b) const +int64_t SerialNumRange::cmp(const SerialNumRange & b) const { int64_t diff(0); if ( ! (contains(b) || b.contains(*this)) ) { @@ -50,6 +34,7 @@ SerialNumRange::cmp(const SerialNumRange & b) const Packet::Packet(const void * buf, size_t sz) : _count(0), _range(), + _limit(sz), _buf(static_cast<const char *>(buf), sz) { nbostream_longlivedbuf os(_buf.c_str(), sz); @@ -64,22 +49,18 @@ Packet::Packet(const void * buf, size_t sz) : } } -void -Packet::merge(const Packet & packet) +bool Packet::merge(const Packet & packet) { - if (_range.to() >= packet.range().from()) { - throwRangeError(_range.to(), packet.range().from()); - } - if (_buf.empty()) { - _range.from(packet.range().from()); + bool retval(_range.to() < packet._range.from()); + if (retval) { + _count += packet._count; + _range.to(packet._range.to()); + _buf.write(packet.getHandle().c_str(), packet.getHandle().size()); } - _count += packet._count; - _range.to(packet._range.to()); - _buf.write(packet.getHandle().c_str(), packet.getHandle().size()); + return retval; } -nbostream & -Packet::Entry::deserialize(nbostream & os) +nbostream & Packet::Entry::deserialize(nbostream & os) { _valid = false; int32_t len(0); @@ -90,8 +71,7 @@ Packet::Entry::deserialize(nbostream & os) return os; } -nbostream & -Packet::Entry::serialize(nbostream & os) const +nbostream & Packet::Entry::serialize(nbostream & os) const { os << _unique << _type << static_cast<uint32_t>(_data.size()); os.write(_data.c_str(), _data.size()); @@ -103,21 +83,22 @@ Packet::Entry::Entry(SerialNum u, Type t, const vespalib::ConstBufferRef & d) : _type(t), _valid(true), _data(d) -{ } - -void -Packet::add(const Packet::Entry & e) { - if (_range.to() >= e.serial()) { - throwRangeError(_range.to(), e.serial()); - } +} + - if (_buf.empty()) { - _range.from(e.serial()); +bool Packet::add(const Packet::Entry & e) +{ + bool retval((_buf.size() < _limit) && (_range.to() < e.serial())); + if (retval) { + if (_buf.empty()) { + _range.from(e.serial()); + } + e.serialize(_buf); + _count++; + _range.to(e.serial()); } - e.serialize(_buf); - _count++; - _range.to(e.serial()); + return retval; } } diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h index 0deceb2668a..db8b9727daa 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.h +++ b/searchlib/src/vespa/searchlib/transactionlog/common.h @@ -69,19 +69,21 @@ public: vespalib::ConstBufferRef _data; }; public: - Packet(size_t reserved) : _count(0), _range(), _buf(reserved) { } + Packet(size_t m=0xf000) : _count(0), _range(), _limit(m), _buf(m) { } Packet(const void * buf, size_t sz); - void add(const Entry & data); + bool add(const Entry & data); + void close() { } void clear() { _buf.clear(); _count = 0; _range.from(0); _range.to(0); } const SerialNumRange & range() const { return _range; } const vespalib::nbostream & getHandle() const { return _buf; } size_t size() const { return _count; } bool empty() const { return _count == 0; } size_t sizeBytes() const { return _buf.size(); } - void merge(const Packet & packet); + bool merge(const Packet & packet); private: size_t _count; SerialNumRange _range; + size_t _limit; vespalib::nbostream_longlivedbuf _buf; }; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index e29adb56dd4..88c2dd9ecc3 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -20,41 +20,28 @@ using vespalib::MonitorGuard; using search::common::FileHeaderContext; using std::runtime_error; using namespace std::chrono_literals; -using namespace std::chrono; -using std::make_shared; namespace search::transactionlog { -DomainConfig::DomainConfig() - : _encoding(Encoding::Crc::xxh64, Encoding::Compression::none), - _compressionLevel(9), - _partSizeLimit(0x10000000), // 256M - _chunkSizeLimit(0x40000), // 256k - _chunkAgeLimit(10ms) -{ } -Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPool & threadPool, - Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, - const FileHeaderContext &fileHeaderContext) - : _config(cfg), - _currentChunk(std::make_unique<Chunk>()), - _lastSerial(0), - _threadPool(threadPool), - _commitExecutor(commitExecutor), - _sessionExecutor(sessionExecutor), - _sessionId(1), - _syncMonitor(), - _pendingSync(false), - _name(domainName), - _parts(), - _lock(), - _currentChunkMonitor(), - _sessionLock(), - _sessions(), - _maxSessionRunTime(), - _baseDir(baseDir), - _fileHeaderContext(fileHeaderContext), - _markedDeleted(false), - _self(nullptr) +Domain::Domain(const string &domainName, const string & baseDir, Executor & commitExecutor, + Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType, + const FileHeaderContext &fileHeaderContext) : + _defaultCrcType(defaultCrcType), + _commitExecutor(commitExecutor), + _sessionExecutor(sessionExecutor), + _sessionId(1), + _syncMonitor(), + _pendingSync(false), + _name(domainName), + _domainPartSize(domainPartSize), + _parts(), + _lock(), + _sessionLock(), + _sessions(), + _maxSessionRunTime(), + _baseDir(baseDir), + _fileHeaderContext(fileHeaderContext), + _markedDeleted(false) { int retval(0); if ((retval = makeDirectory(_baseDir.c_str())) != 0) { @@ -72,34 +59,12 @@ Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPo } _sessionExecutor.sync(); if (_parts.empty() || _parts.crbegin()->second->isClosed()) { - _parts[lastPart] = make_shared<DomainPart>(_name, dir(), lastPart, _config.getEncoding(), - _config.getCompressionlevel(), _fileHeaderContext, false); + _parts[lastPart].reset(new DomainPart(_name, dir(), lastPart, _defaultCrcType, _fileHeaderContext, false)); } - _lastSerial = end(); - _self = _threadPool.NewThread(this); - assert(_self); } -Domain & -Domain::setConfig(const DomainConfig & cfg) { - _config = cfg; - return *this; -} - -void -Domain::Run(FastOS_ThreadInterface *thisThread, void *) { - - while (!thisThread->GetBreakFlag()) { - vespalib::MonitorGuard guard(_currentChunkMonitor); - guard.wait(duration_cast<milliseconds>(_config.getChunkAgeLimit()).count()); - commitIfStale(guard); - } -} - -void -Domain::addPart(int64_t partId, bool isLastPart) { - auto dp = make_shared<DomainPart>(_name, dir(), partId, _config.getEncoding(), - _config.getCompressionlevel(), _fileHeaderContext, isLastPart); +void Domain::addPart(int64_t partId, bool isLastPart) { + DomainPart::SP dp(new DomainPart(_name, dir(), partId, _defaultCrcType, _fileHeaderContext, isLastPart)); if (dp->size() == 0) { // Only last domain part is allowed to be truncated down to // empty size. @@ -137,16 +102,7 @@ private: bool & _pendingSync; }; -Domain::~Domain() { - if (_self) { - _self->SetBreakFlag(); - { - MonitorGuard guard(_currentChunkMonitor); - guard.broadcast(); - } - _self->Join(); - } -} +Domain::~Domain() { } DomainInfo Domain::getDomainInfo() const @@ -173,7 +129,7 @@ Domain::begin(const LockGuard & guard) const assert(guard.locks(_lock)); SerialNum s(0); if ( ! _parts.empty() ) { - s = _parts.cbegin()->second->range().from(); + s = _parts.begin()->second->range().from(); } return s; } @@ -191,7 +147,7 @@ Domain::end(const LockGuard & guard) const assert(guard.locks(_lock)); SerialNum s(0); if ( ! _parts.empty() ) { - s = _parts.crbegin()->second->range().to(); + s = _parts.rbegin()->second->range().to(); } return s; } @@ -245,8 +201,7 @@ Domain::triggerSyncNow() } } -DomainPart::SP -Domain::findPart(SerialNum s) +DomainPart::SP Domain::findPart(SerialNum s) { LockGuard guard(_lock); DomainPartList::iterator it(_parts.upper_bound(s)); @@ -263,14 +218,12 @@ Domain::findPart(SerialNum s) return DomainPart::SP(); } -uint64_t -Domain::size() const +uint64_t Domain::size() const { return size(LockGuard(_lock)); } -uint64_t -Domain::size(const LockGuard & guard) const +uint64_t Domain::size(const LockGuard & guard) const { (void) guard; assert(guard.locks(_lock)); @@ -281,8 +234,7 @@ Domain::size(const LockGuard & guard) const return sz; } -SerialNum -Domain::findOldestActiveVisit() const +SerialNum Domain::findOldestActiveVisit() const { SerialNum oldestActive(std::numeric_limits<SerialNum>::max()); LockGuard guard(_sessionLock); @@ -295,8 +247,7 @@ Domain::findOldestActiveVisit() const return oldestActive; } -void -Domain::cleanSessions() +void Domain::cleanSessions() { if ( _sessions.empty()) { return; @@ -316,8 +267,7 @@ Domain::cleanSessions() namespace { -void -waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync) +void waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync) { MonitorGuard guard(syncMonitor); while (pendingSync) { @@ -327,86 +277,18 @@ waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync) } -Domain::Chunk::Chunk() - : _data(size_t(-1)), - _callBacks(), - _firstArrivalTime() -{} - -Domain::Chunk::~Chunk() = default; - -void -Domain::Chunk::add(const Packet &packet, Writer::DoneCallback onDone) { - if (_callBacks.empty()) { - _firstArrivalTime = steady_clock::now(); - } - _data.merge(packet); - _callBacks.emplace_back(std::move(onDone)); -} - -microseconds -Domain::Chunk::age() const { - if (_callBacks.empty()) { - return 0ms; - } - return duration_cast<microseconds>(steady_clock::now() - _firstArrivalTime); -} - -void -Domain::commit(const Packet & packet, Writer::DoneCallback onDone) { - vespalib::MonitorGuard guard(_currentChunkMonitor); - if (! (_lastSerial < packet.range().from())) { - throw runtime_error(make_string("Incomming serial number(%ld) must be bigger than the last one (%ld).", - packet.range().from(), _lastSerial)); - } else { - _lastSerial = packet.range().to(); - } - _currentChunk->add(packet, std::move(onDone)); - commitIfFull(guard); -} - -void -Domain::commitIfFull(const vespalib::MonitorGuard &guard) { - if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) { - auto completed = grabCurrentChunk(guard); - if (completed) { - commitChunk(std::move(completed), guard); - } - } -} - -std::unique_ptr<Domain::Chunk> -Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) { - assert(guard.monitors(_currentChunkMonitor)); - auto chunk = std::move(_currentChunk); - _currentChunk = std::make_unique<Chunk>(); - return chunk; -} - -void -Domain::commitIfStale(const vespalib::MonitorGuard & guard) { - assert(guard.monitors(_currentChunkMonitor)); - if (_currentChunk->age() > _config.getChunkAgeLimit()) { - commitChunk(grabCurrentChunk(guard), guard); - } -} - -void -Domain::commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard) +void Domain::commit(const Packet & packet) { - assert(chunkOrderGuard.monitors(_currentChunkMonitor)); - const Packet & packet = chunk->getPacket(); DomainPart::SP dp(_parts.rbegin()->second); vespalib::nbostream_longlivedbuf is(packet.getHandle().c_str(), packet.getHandle().size()); Packet::Entry entry; entry.deserialize(is); - if (dp->byteSize() > _config.getPartSizeLimit()) { + if (dp->byteSize() > _domainPartSize) { waitPendingSync(_syncMonitor, _pendingSync); triggerSyncNow(); waitPendingSync(_syncMonitor, _pendingSync); dp->close(); - dp = make_shared<DomainPart>(_name, dir(), entry.serial(), _config.getEncoding(), - _config.getCompressionlevel(), _fileHeaderContext, false); + dp.reset(new DomainPart(_name, dir(), entry.serial(), _defaultCrcType, _fileHeaderContext, false)); { LockGuard guard(_lock); _parts[entry.serial()] = dp; @@ -417,8 +299,7 @@ Domain::commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & cleanSessions(); } -bool -Domain::erase(SerialNum to) +bool Domain::erase(SerialNum to) { bool retval(true); /// Do not erase the last element @@ -436,9 +317,8 @@ Domain::erase(SerialNum to) return retval; } -int -Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to, - FRT_Supervisor & supervisor, FNET_Connection *conn) +int Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to, + FRT_Supervisor & supervisor, FNET_Connection *conn) { assert(this == domain.get()); cleanSessions(); @@ -449,8 +329,7 @@ Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to, return session->id(); } -int -Domain::startSession(int sessionId) +int Domain::startSession(int sessionId) { int retval(-1); LockGuard guard(_sessionLock); @@ -466,8 +345,7 @@ Domain::startSession(int sessionId) return retval; } -int -Domain::closeSession(int sessionId) +int Domain::closeSession(int sessionId) { _commitExecutor.sync(); int retval(-1); diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index 2690f32471b..c1ff9157a6f 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -8,39 +8,16 @@ namespace search::transactionlog { -class DomainConfig { -public: - using microseconds = std::chrono::microseconds; - DomainConfig(); - DomainConfig & setEncoding(Encoding v) { _encoding = v; return *this; } - DomainConfig & setPartSizeLimit(size_t v) { _partSizeLimit = v; return *this; } - DomainConfig & setChunkSizeLimit(size_t v) { _chunkSizeLimit = v; return *this; } - DomainConfig & setChunkAgeLimit(microseconds v) { _chunkAgeLimit = v; return *this; } - DomainConfig & setCompressionLevel(uint8_t v) { _compressionLevel = v; return *this; } - Encoding getEncoding() const { return _encoding; } - size_t getPartSizeLimit() const { return _partSizeLimit; } - size_t getChunkSizeLimit() const { return _chunkSizeLimit; } - microseconds getChunkAgeLimit() const { return _chunkAgeLimit; } - uint8_t getCompressionlevel() const { return _compressionLevel; } -private: - Encoding _encoding; - uint8_t _compressionLevel; - size_t _partSizeLimit; - size_t _chunkSizeLimit; - microseconds _chunkAgeLimit; -}; - struct PartInfo { SerialNumRange range; size_t numEntries; size_t byteSize; vespalib::string file; - PartInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, vespalib::stringref file_in) - : range(range_in), - numEntries(numEntries_in), - byteSize(byteSize_in), - file(file_in) - {} + PartInfo(SerialNumRange range_in, size_t numEntries_in, + size_t byteSize_in, + vespalib::stringref file_in) + : range(range_in), numEntries(numEntries_in), byteSize(byteSize_in), + file(file_in) {} }; struct DomainInfo { @@ -58,22 +35,22 @@ struct DomainInfo { typedef std::map<vespalib::string, DomainInfo> DomainStats; -class Domain final : public FastOS_Runnable +class Domain { public: using SP = std::shared_ptr<Domain>; using Executor = vespalib::ThreadExecutor; - Domain(const vespalib::string &name, const vespalib::string &baseDir, FastOS_ThreadPool & threadPool, - Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, + Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & commitExecutor, + Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType, const common::FileHeaderContext &fileHeaderContext); - ~Domain() override; + virtual ~Domain(); DomainInfo getDomainInfo() const; const vespalib::string & name() const { return _name; } bool erase(SerialNum to); - void commit(const Packet & packet, Writer::DoneCallback onDone); + void commit(const Packet & packet); int visit(const Domain::SP & self, SerialNum from, SerialNum to, FRT_Supervisor & supervisor, FNET_Connection *conn); SerialNum begin() const; @@ -100,27 +77,8 @@ public: return _sessionExecutor.execute(std::move(task)); } uint64_t size() const; - Domain & setConfig(const DomainConfig & cfg); + private: - void Run(FastOS_ThreadInterface *thisThread, void *arguments) override; - void commitIfStale(const vespalib::MonitorGuard & guard); - void commitIfFull(const vespalib::MonitorGuard & guard); - class Chunk { - public: - Chunk(); - ~Chunk(); - void add(const Packet & packet, Writer::DoneCallback onDone); - size_t sizeBytes() const { return _data.sizeBytes(); } - const Packet & getPacket() const { return _data; } - std::chrono::microseconds age() const; - private: - Packet _data; - std::vector<Writer::DoneCallback> _callBacks; - std::chrono::steady_clock::time_point _firstArrivalTime; - }; - - std::unique_ptr<Chunk> grabCurrentChunk(const vespalib::MonitorGuard & guard); - void commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard); SerialNum begin(const vespalib::LockGuard & guard) const; SerialNum end(const vespalib::LockGuard & guard) const; size_t byteSize(const vespalib::LockGuard & guard) const; @@ -137,26 +95,22 @@ private: using DomainPartList = std::map<int64_t, DomainPart::SP>; using DurationSeconds = std::chrono::duration<double>; - DomainConfig _config; - std::unique_ptr<Chunk> _currentChunk; - SerialNum _lastSerial; - FastOS_ThreadPool & _threadPool; - Executor & _commitExecutor; - Executor & _sessionExecutor; - std::atomic<int> _sessionId; - vespalib::Monitor _syncMonitor; - bool _pendingSync; - vespalib::string _name; - DomainPartList _parts; - vespalib::Lock _lock; - vespalib::Monitor _currentChunkMonitor; - vespalib::Lock _sessionLock; - SessionList _sessions; - DurationSeconds _maxSessionRunTime; - vespalib::string _baseDir; + DomainPart::Crc _defaultCrcType; + Executor & _commitExecutor; + Executor & _sessionExecutor; + std::atomic<int> _sessionId; + vespalib::Monitor _syncMonitor; + bool _pendingSync; + vespalib::string _name; + uint64_t _domainPartSize; + DomainPartList _parts; + vespalib::Lock _lock; + vespalib::Lock _sessionLock; + SessionList _sessions; + DurationSeconds _maxSessionRunTime; + vespalib::string _baseDir; const common::FileHeaderContext &_fileHeaderContext; - bool _markedDeleted; - FastOS_ThreadInterface * _self; + bool _markedDeleted; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp index f140fb93b96..35bdc71c963 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp @@ -2,6 +2,7 @@ #include "domainpart.h" #include <vespa/vespalib/util/crc.h> +#include <vespa/vespalib/xxhash/xxhash.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/data/fileheader.h> #include <vespa/searchlib/common/fileheadercontext.h> @@ -26,26 +27,37 @@ namespace search::transactionlog { namespace { -constexpr size_t TARGET_PACKET_SIZE = 0x3f000; +void +handleSync(FastOS_FileInterface &file) __attribute__ ((noinline)); string -handleWriteError(const char *text, FastOS_FileInterface &file, int64_t lastKnownGoodPos, - SerialNumRange range, int bufLen) __attribute__ ((noinline)); +handleWriteError(const char *text, + FastOS_FileInterface &file, + int64_t lastKnownGoodPos, + const Packet::Entry &entry, + int bufLen) __attribute__ ((noinline)); bool -handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize_t rlen, - int64_t lastKnownGoodPos, bool allowTruncate) __attribute__ ((noinline)); +handleReadError(const char *text, + FastOS_FileInterface &file, + ssize_t len, + ssize_t rlen, + int64_t lastKnownGoodPos, + bool allowTruncate) __attribute__ ((noinline)); -void handleSync(FastOS_FileInterface &file) __attribute__ ((noinline)); -void addPacket(Packet &packet, const Packet::Entry &e) __attribute__ ((noinline)); -bool tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) __attribute__ ((noinline)); +bool +addPacket(Packet &packet, + const Packet::Entry &e) __attribute__ ((noinline)); -void +bool +tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) __attribute__ ((noinline)); + +bool addPacket(Packet &packet, const Packet::Entry &e) { LOG(spam, "Adding serial #%" PRIu64 ", of type %d and size %zd into packet of size %zu and %zu bytes", e.serial(), e.type(), e.data().size(), packet.size(), packet.sizeBytes()); - packet.add(e); + return ! packet.add(e); } void @@ -60,18 +72,21 @@ handleSync(FastOS_FileInterface &file) } string -handleWriteError(const char *text, FastOS_FileInterface &file, int64_t lastKnownGoodPos, - SerialNumRange range, int bufLen) +handleWriteError(const char *text, + FastOS_FileInterface &file, + int64_t lastKnownGoodPos, + const Packet::Entry &entry, + int bufLen) { string last(FastOS_File::getLastErrorString()); - string e(make_string("%s. File '%s' at position %" PRId64 " for entries [%zu, %zu] of length %u. " - "OS says '%s'. Rewind to last known good position %zu.", - text, file.GetFileName(), file.GetPosition(), range.from(), range.to(), bufLen, + string e(make_string("%s. File '%s' at position %" PRId64 " for entry %" PRIu64 " of length %u. " + "OS says '%s'. Rewind to last known good position %" PRId64 ".", + text, file.GetFileName(), file.GetPosition(), entry.serial(), bufLen, last.c_str(), lastKnownGoodPos)); LOG(error, "%s", e.c_str()); if ( ! file.SetPosition(lastKnownGoodPos) ) { last = FastOS_File::getLastErrorString(); - throw runtime_error(make_string("Failed setting position %zu of file '%s' of size %zd : OS says '%s'", + throw runtime_error(make_string("Failed setting position %" PRId64 " of file '%s' of size %" PRId64 ": OS says '%s'", lastKnownGoodPos, file.GetFileName(), file.GetSize(), last.c_str())); } handleSync(file); @@ -103,8 +118,12 @@ tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) } bool -handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize_t rlen, - int64_t lastKnownGoodPos, bool allowTruncate) +handleReadError(const char *text, + FastOS_FileInterface &file, + ssize_t len, + ssize_t rlen, + int64_t lastKnownGoodPos, + bool allowTruncate) { bool retval(true); if (rlen != -1) { @@ -159,43 +178,6 @@ handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize } -Packet -DomainPart::readPacket(FastOS_FileInterface & transLog, SerialNumRange wanted, size_t targetSize, bool allowTruncate) { - Alloc buf; - Packet packet(targetSize); - int64_t fSize(transLog.GetSize()); - int64_t currPos(transLog.GetPosition()); - for(size_t i(0); (packet.sizeBytes() < targetSize) && (currPos < fSize) && (packet.range().to() < wanted.to()); i++) { - IChunk::UP chunk; - if (read(transLog, chunk, buf, allowTruncate)) { - if (chunk) { - try { - for (const Packet::Entry & e : chunk->getEntries()) { - if ((wanted.from() < e.serial()) && (e.serial() <= wanted.to())) { - addPacket(packet, e); - } - } - } catch (const std::exception & ex) { - throw runtime_error(make_string("%s : Failed creating packet for list %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", - ex.what(), transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); - } - } else { - throw runtime_error(make_string("Invalid entry reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", - transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); - } - } else { - if (transLog.GetSize() != fSize) { - fSize = transLog.GetSize(); - } else { - throw runtime_error(make_string("Failed reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", - transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); - } - } - currPos = transLog.GetPosition(); - } - return packet; -} - int64_t DomainPart::buildPacketMapping(bool allowTruncate) { @@ -226,35 +208,66 @@ DomainPart::buildPacketMapping(bool allowTruncate) handleReadError("file header", transLog, 0, FileHeader::getMinSize(), 0, allowTruncate); } } - const SerialNumRange all(0, std::numeric_limits<SerialNum>::max()); while ((currPos < fSize)) { - const int64_t firstPos(currPos); - Packet packet = readPacket(transLog, all, TARGET_PACKET_SIZE, allowTruncate); - if (!packet.empty()) { - _sz += packet.size(); - const SerialNum firstSerial = packet.range().from(); - if (currPos == _headerLen) { - _range.from(firstSerial); + Packet packet; + SerialNum firstSerial(0); + SerialNum lastSerial(0); + int64_t firstPos(currPos); + bool full(false); + Alloc buf; + for(size_t i(0); !full && (currPos < fSize); i++) { + Packet::Entry e; + if (read(transLog, e, buf, allowTruncate)) { + if (e.valid()) { + if (i == 0) { + firstSerial = e.serial(); + if (currPos == _headerLen) { + _range.from(firstSerial); + } + } + try { + full = addPacket(packet, e); + if ( ! full ) { + lastSerial = e.serial(); + currPos = transLog.GetPosition(); + _sz++; + } else { + transLog.SetPosition(currPos); + } + } catch (const std::exception & ex) { + throw runtime_error(make_string("%s : Failed creating packet for list %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", + ex.what(), transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); + } + } else { + throw runtime_error(make_string("Invalid entry reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", + transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); + } + } else { + if (transLog.GetSize() != fSize) { + fSize = transLog.GetSize(); + } else { + throw runtime_error(make_string("Failed reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", + transLog.GetFileName(), fSize, currPos, transLog.GetPosition())); + } } - _range.to(packet.range().to()); - _packets.insert(std::make_pair(firstSerial, std::move(packet))); + } + packet.close(); + if (!packet.empty()) { + _packets[firstSerial] = packet; + _range.to(lastSerial); { LockGuard guard(_lock); _skipList.push_back(SkipInfo(firstSerial, firstPos)); } - } else { - fSize = transLog.GetSize(); } - currPos = transLog.GetPosition(); } transLog.Close(); return currPos; } -DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding, - uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate) : - _encoding(encoding), - _compressionLevel(compressionLevel), +DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Crc defaultCrc, + const FileHeaderContext &fileHeaderContext, bool allowTruncate) : + _defaultCrc(defaultCrc), _lock(), _fileLock(), _range(s), @@ -398,12 +411,10 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) //LOG(spam, //"Pos(%d) Len(%d), Lim(%d), Remaining(%d)", //h.getPos(), h.getLength(), h.getLimit(), h.getRemaining()); - IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel); Packet::Entry entry; entry.deserialize(h); if (_range.to() < entry.serial()) { - chunk->add(entry); - write(*_transLog, *chunk); + write(*_transLog, entry); _sz++; _range.to(entry.serial()); } else { @@ -417,15 +428,17 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet) if ( ! _packets.empty() ) { Packet & lastPacket = _packets.rbegin()->second; if (lastPacket.sizeBytes() < 0xf000) { - lastPacket.merge(packet); - merged = true; + if ( ! (merged = lastPacket.merge(packet)) ) { + LOG(error, "Failed merging packet [%" PRIu64 ", %" PRIu64 "] with [%" PRIu64 ", %" PRIu64 "]", + lastPacket.range().from(), lastPacket.range().to(), + packet.range().from(), packet.range().to()); + } } } if (! merged ) { - _packets.insert(std::make_pair(firstSerial, std::move(packet))); + _packets[firstSerial] = packet; _skipList.push_back(SkipInfo(firstSerial, firstPos)); } - sync(); } void DomainPart::sync() @@ -493,17 +506,23 @@ DomainPart::visit(SerialNumRange &r, Packet &packet) if (e.serial() <= r.to()) { LOG(spam, "Adding serial #%" PRIu64 ", of type %d and size %zd into packet of size %zu and %zu bytes", e.serial(), e.type(), e.data().size(), newPacket.size(), newPacket.sizeBytes()); - newPacket.add(e); - r.from(e.serial()); + if (newPacket.add(e)) { + r.from(e.serial()); + } else { + throw runtime_error("Could not add entry to packet. Here is some mumbo jumbo. Fix."); + } } else { // Force breakout on visiting empty interval. r.from(r.to()); } } } - packet = std::move(newPacket); + newPacket.close(); + packet = newPacket; retval = next != _packets.end(); } + } else { + packet.close(); } } else { /// File has been closed must continue from file. @@ -520,86 +539,131 @@ DomainPart::visit(FastOS_FileInterface &file, SerialNumRange &r, Packet &packet) if ( ! file.IsOpened() ) { retval = openAndFind(file, r.from() + 1); } - if ( ! retval) { - return false; - } - - packet = readPacket(file, r, TARGET_PACKET_SIZE, false); - if (!packet.empty()) { - r.from(packet.range().to()); + if (retval) { + Packet newPacket; + Alloc buf; + for (bool full(false);!full && retval && (r.from() < r.to());) { + Packet::Entry e; + int64_t fPos = file.GetPosition(); + retval = read(file, e, buf, false); + if (retval && + e.valid() && + (r.from() < e.serial()) && + (e.serial() <= r.to())) { + try { + full = addPacket(newPacket, e); + } catch (const std::exception & ex) { + throw runtime_error(make_string("%s : Failed creating packet for visit %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")", + ex.what(), file.GetFileName(), file.GetSize(), fPos, file.GetPosition())); + } + if ( !full ) { + r.from(e.serial()); + } else { + if ( ! file.SetPosition(fPos) ) { + throw runtime_error(make_string("Failed setting read position for file '%s' of size %" PRId64 " from %" PRId64 " to %" PRId64 ".", + file.GetFileName(), file.GetSize(), file.GetPosition(), fPos)); + } + } + } + } + newPacket.close(); + packet = newPacket; } - return !packet.empty(); + return retval; } void -DomainPart::write(FastOS_FileInterface &file, const IChunk & chunk) +DomainPart::write(FastOS_FileInterface &file, const Packet::Entry &entry) { - nbostream os; - size_t begin = os.wp(); - os << _encoding.getRaw(); - os << uint32_t(0); - Encoding realEncoding = chunk.encode(os); - size_t end = os.wp(); - os.wp(0); - os << realEncoding.getRaw(); - os << uint32_t(end - (begin + sizeof(uint32_t) + sizeof(uint8_t))); - os.wp(end); int64_t lastKnownGoodPos(file.GetPosition()); + int32_t crc(0); + uint32_t len(entry.serializedSize() + sizeof(crc)); + nbostream os; + os << static_cast<uint8_t>(_defaultCrc); + os << len; + size_t start(os.size()); + entry.serialize(os); + size_t end(os.size()); + crc = calcCrc(_defaultCrc, os.c_str()+start, end - start); + os << crc; + size_t osSize = os.size(); + assert(osSize == len + sizeof(len) + sizeof(uint8_t)); LockGuard guard(_writeLock); - if ( ! file.CheckedWrite(os.c_str(), os.size()) ) { - throw runtime_error(handleWriteError("Failed writing the entry.", file, lastKnownGoodPos, chunk.range(), os.size())); + if ( ! file.CheckedWrite(os.c_str(), osSize) ) { + throw runtime_error(handleWriteError("Failed writing the entry.", file, lastKnownGoodPos, entry, end - start)); } - _writtenSerial = chunk.range().to(); - _byteSize.store(lastKnownGoodPos + os.size(), std::memory_order_release); + _writtenSerial = entry.serial(); + _byteSize.store(lastKnownGoodPos + osSize, std::memory_order_release); } bool -DomainPart::read(FastOS_FileInterface &file, IChunk::UP & chunk, Alloc & buf, bool allowTruncate) +DomainPart::read(FastOS_FileInterface &file, + Packet::Entry &entry, + Alloc & buf, + bool allowTruncate) { + bool retval(true); char tmp[5]; int64_t lastKnownGoodPos(file.GetPosition()); size_t rlen = file.Read(tmp, sizeof(tmp)); nbostream his(tmp, sizeof(tmp)); - uint8_t encoding(-1); + uint8_t version(-1); uint32_t len(0); - his >> encoding >> len; - if (rlen != sizeof(tmp)) { - return (rlen == 0) - ? true - : handleReadError("packet length", file, sizeof(len), rlen, lastKnownGoodPos, allowTruncate); - } - - try { - chunk = IChunk::create(encoding); - } catch (const std::exception & e) { - string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2'," - " got %d from '%s' at position %ld", - encoding, file.GetFileName(), lastKnownGoodPos)); - if ((encoding == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) { - LOG(warning, "%s", msg.c_str()); - return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate); + his >> version >> len; + if ((retval = (rlen == sizeof(tmp)))) { + if ( ! (retval = (version == ccitt_crc32) || version == xxh64)) { + string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2'," + " got %d from '%s' at position %ld", + version, file.GetFileName(), lastKnownGoodPos)); + if ((version == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) { + LOG(warning, "%s", msg.c_str()); + return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate); + } else { + throw runtime_error(msg); + } + } + if (len > buf.size()) { + Alloc::alloc(len).swap(buf); + } + rlen = file.Read(buf.get(), len); + retval = rlen == len; + if (!retval) { + retval = handleReadError("packet blob", file, len, rlen, lastKnownGoodPos, allowTruncate); } else { - throw runtime_error(msg); + nbostream_longlivedbuf is(buf.get(), len); + entry.deserialize(is); + int32_t crc(0); + is >> crc; + int32_t crcVerify(calcCrc(static_cast<Crc>(version), buf.get(), len - sizeof(crc))); + if (crc != crcVerify) { + throw runtime_error(make_string("Got bad crc for packet from '%s' (len pos=%" PRId64 ", len=%d) : crcVerify = %d, expected %d", + file.GetFileName(), file.GetPosition() - len - sizeof(len), + static_cast<int>(len), static_cast<int>(crcVerify), static_cast<int>(crc))); + } + } + } else { + if (rlen == 0) { + // Eof + } else { + retval = handleReadError("packet length", file, sizeof(len), rlen, lastKnownGoodPos, allowTruncate); } } - if (len > buf.size()) { - Alloc::alloc(len).swap(buf); - } - rlen = file.Read(buf.get(), len); - if (rlen != len) { - return handleReadError("packet blob", file, len, rlen, lastKnownGoodPos, allowTruncate); - } - try { - nbostream_longlivedbuf is(buf.get(), len); - chunk->decode(is); - } catch (const std::exception & e) { - throw runtime_error(make_string("Got exception during decoding of packet '%s' from file '%s' (len pos=%" PRId64 ", len=%d)", - e.what(), file.GetFileName(), file.GetPosition() - len - sizeof(len), static_cast<int>(len))); - } + return retval; +} - return true; +int32_t DomainPart::calcCrc(Crc version, const void * buf, size_t sz) +{ + if (version == xxh64) { + return static_cast<int32_t>(XXH64(buf, sz, 0ll)); + } else if (version == ccitt_crc32) { + vespalib::crc_32_type calculator; + calculator.process_bytes(buf, sz); + return calculator.checksum(); + } else { + abort(); + } } } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h index 5256b731125..59d0df6df94 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h @@ -2,7 +2,6 @@ #pragma once #include "common.h" -#include "ichunk.h" #include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/memory.h> #include <map> @@ -20,9 +19,13 @@ private: DomainPart& operator=(const DomainPart &); public: + enum Crc { + ccitt_crc32=1, + xxh64=2 + }; typedef std::shared_ptr<DomainPart> SP; - DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Encoding defaultEncoding, - uint8_t compressionLevel, const common::FileHeaderContext &FileHeaderContext, bool allowTruncate); + DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Crc defaultCrc, + const common::FileHeaderContext &FileHeaderContext, bool allowTruncate); ~DomainPart(); @@ -46,13 +49,13 @@ public: } bool isClosed() const; private: - using Alloc = vespalib::alloc::Alloc; bool openAndFind(FastOS_FileInterface &file, const SerialNum &from); int64_t buildPacketMapping(bool allowTruncate); - static Packet readPacket(FastOS_FileInterface & file, SerialNumRange wanted, size_t targetSize, bool allowTruncate); - static bool read(FastOS_FileInterface &file, IChunk::UP & chunk, Alloc &buf, bool allowTruncate); - void write(FastOS_FileInterface &file, const IChunk & entry); + static bool read(FastOS_FileInterface &file, Packet::Entry &entry, vespalib::alloc::Alloc &buf, bool allowTruncate); + + void write(FastOS_FileInterface &file, const Packet::Entry &entry); + static int32_t calcCrc(Crc crc, const void * buf, size_t len); void writeHeader(const common::FileHeaderContext &fileHeaderContext); class SkipInfo @@ -74,22 +77,21 @@ private: }; typedef std::vector<SkipInfo> SkipList; typedef std::map<SerialNum, Packet> PacketList; - const Encoding _encoding; - const uint8_t _compressionLevel; - vespalib::Lock _lock; - vespalib::Lock _fileLock; - SerialNumRange _range; - size_t _sz; + const Crc _defaultCrc; + vespalib::Lock _lock; + vespalib::Lock _fileLock; + SerialNumRange _range; + size_t _sz; std::atomic<uint64_t> _byteSize; - PacketList _packets; - vespalib::string _fileName; + PacketList _packets; + vespalib::string _fileName; std::unique_ptr<FastOS_FileInterface> _transLog; - SkipList _skipList; - uint32_t _headerLen; - vespalib::Lock _writeLock; + SkipList _skipList; + uint32_t _headerLen; + vespalib::Lock _writeLock; // Protected by _writeLock - SerialNum _writtenSerial; - SerialNum _syncedSerial; + SerialNum _writtenSerial; + SerialNum _syncedSerial; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp deleted file mode 100644 index af7d396c0e4..00000000000 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "chunks.h" -#include <vespa/vespalib/util/stringfmt.h> -#include <vespa/vespalib/util/crc.h> -#include <vespa/vespalib/xxhash/xxhash.h> -#include <cassert> - -using std::runtime_error; -using std::make_unique; -using vespalib::make_string; -using vespalib::nbostream_longlivedbuf; -using vespalib::compression::CompressionConfig; - -namespace search::transactionlog { - -Encoding::Encoding(Crc crc, Compression compression) - : _raw(crc | (compression >> 2)) -{ - assert(crc <= Crc::xxh64); - assert(compression <= Compression::lz4); -} - -IChunk::~IChunk() = default; - -void -IChunk::add(const Packet::Entry & entry) { - _entries.emplace_back(entry); -} - -SerialNumRange -IChunk::range() const { - return _entries.empty() - ? SerialNumRange() - : SerialNumRange(_entries.front().serial(), _entries.back().serial()); -} - -void -IChunk::deserializeEntries(nbostream & is) { - while (is.good() && !is.empty()) { - Packet::Entry e; - e.deserialize(is); - add(e); - } -} - -void -IChunk::serializeEntries(nbostream & os) const { - for (const auto & e : _entries) { - e.serialize(os); - } -} - -Encoding -IChunk::encode(nbostream & os) const { - return onEncode(os); -} - -void -IChunk::decode(nbostream & is) { - onDecode(is); -} - -IChunk::UP -IChunk::create(uint8_t chunkType) { - return create(Encoding(chunkType), 9); -} -IChunk::UP -IChunk::create(Encoding encoding, uint8_t compressionLevel) { - switch (encoding.getCrc()) { - case Encoding::Crc::xxh64: - switch (encoding.getCompression()) { - case Encoding::Compression::none: - return make_unique<XXH64None>(); - case Encoding::Compression::lz4: - return make_unique<XXH64Compressed>(CompressionConfig::LZ4, compressionLevel); - case Encoding::Compression::zstd: - return make_unique<XXH64Compressed>(CompressionConfig::ZSTD, compressionLevel); - default: - return make_unique<XXH64Compressed>(CompressionConfig::LZ4, compressionLevel); - } - case Encoding::Crc::ccitt_crc32: - switch (encoding.getCompression()) { - case Encoding::Compression::none: - return make_unique<CCITTCRC32None>(); - default: - throw runtime_error(make_string("Unhandled compression type '%d' for ccitt_crc32 compression", - encoding.getCompression())); - } - default: - throw runtime_error(make_string("Unhandled crc type '%d'", encoding.getCrc())); - } -} - -int32_t Encoding::calcCrc(Crc version, const void * buf, size_t sz) -{ - if (version == xxh64) { - return static_cast<int32_t>(XXH64(buf, sz, 0ll)); - } else if (version == ccitt_crc32) { - vespalib::crc_32_type calculator; - calculator.process_bytes(buf, sz); - return calculator.checksum(); - } else { - abort(); - } -} - -} diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h deleted file mode 100644 index 0e913703468..00000000000 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "common.h" -#include <memory> - -namespace vespalib { class nbostream; } - -namespace search::transactionlog { - -class Encoding { -public: - enum Crc { - nocrc = 0, - ccitt_crc32 = 1, - xxh64 = 2 - }; - enum Compression { - none = 0, - lz4 = 1, - zstd = 2 - }; - Encoding(uint8_t raw) : _raw(raw) {} - Encoding(Crc crc, Compression compression); - Crc getCrc() const { return Crc(_raw & 0x3); } - Compression getCompression() const { return Compression((_raw >> 2) & 0xf); } - static int32_t calcCrc(Crc version, const void * buf, size_t sz); - uint8_t getRaw() const { return _raw; } -private: - uint8_t _raw; -}; - -class IChunk { -public: - using UP = std::unique_ptr<IChunk>; - using Entries = std::vector<Packet::Entry>; - using nbostream = vespalib::nbostream; - using ConstBufferRef = vespalib::ConstBufferRef; - virtual ~IChunk(); - const Entries & getEntries() const { return _entries; } - void add(const Packet::Entry & entry); - Encoding encode(nbostream & os) const; - void decode(nbostream & buf); - static UP create(uint8_t chunkType); - static UP create(Encoding chunkType, uint8_t compressionLevel); - SerialNumRange range() const; -protected: - virtual Encoding onEncode(nbostream & os) const = 0; - virtual void onDecode(nbostream & is) = 0; - void deserializeEntries(nbostream & is); - void serializeEntries(nbostream & os) const; -private: - Entries _entries; -}; - -} diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp index 56aaa162485..cbcbc68fdff 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp @@ -36,7 +36,7 @@ Session::VisitTask::run() bool Session::visit(FastOS_FileInterface & file, DomainPart & dp) { - Packet packet(size_t(-1)); + Packet packet; bool more(false); if (dp.isClosed()) { more = dp.visit(file, _range, packet); diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index 71b5b85c20f..4c3c5609a93 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -1,12 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "translogserver.h" -#include <vespa/searchlib/common/gatecallback.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/fnet/frt/supervisor.h> #include <fstream> -#include <thread> #include <vespa/log/log.h> LOG_SETUP(".transactionlog.server"); @@ -15,9 +13,6 @@ using vespalib::make_string; using vespalib::stringref; using vespalib::IllegalArgumentException; using search::common::FileHeaderContext; -using std::make_shared; -using std::runtime_error; -using namespace std::chrono_literals; namespace search::transactionlog { @@ -31,16 +26,21 @@ class SyncHandler : public FNET_Task SerialNum _syncTo; public: - SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req,const Domain::SP &domain, - const TransLogServer::Session::SP &session, SerialNum syncTo); + SyncHandler(FRT_Supervisor *supervisor, + FRT_RPCRequest *req,const Domain::SP &domain, + const TransLogServer::Session::SP &session, + SerialNum syncTo); ~SyncHandler(); void PerformTask() override; }; -SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const Domain::SP &domain, - const TransLogServer::Session::SP &session, SerialNum syncTo) +SyncHandler::SyncHandler(FRT_Supervisor *supervisor, + FRT_RPCRequest *req, + const Domain::SP &domain, + const TransLogServer::Session::SP &session, + SerialNum syncTo) : FNET_Task(supervisor->GetScheduler()), _req(*req), _domain(domain), @@ -50,7 +50,9 @@ SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const } -SyncHandler::~SyncHandler() = default; +SyncHandler::~SyncHandler() +{ +} void @@ -75,25 +77,25 @@ SyncHandler::PerformTask() TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const FileHeaderContext &fileHeaderContext) - : TransLogServer(name, listenPort, baseDir, fileHeaderContext, - DomainConfig().setEncoding(Encoding(Encoding::xxh64, Encoding::Compression::none)) - .setPartSizeLimit(0x10000000).setChunkSizeLimit(0x40000).setChunkAgeLimit( 100us)) + : TransLogServer(name, listenPort, baseDir, fileHeaderContext, 0x10000000) {} TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg) - : TransLogServer(name, listenPort, baseDir, fileHeaderContext, cfg, 4) + const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize) + : TransLogServer(name, listenPort, baseDir, fileHeaderContext, domainPartSize, 4, DomainPart::Crc::xxh64) {} TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads) + const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize, + size_t maxThreads, DomainPart::Crc defaultCrcType) : FRT_Invokable(), _name(name), _baseDir(baseDir), - _domainConfig(cfg), + _domainPartSize(domainPartSize), + _defaultCrcType(defaultCrcType), _commitExecutor(maxThreads, 128*1024), _sessionExecutor(maxThreads, 128*1024), - _threadPool(0x20000), + _threadPool(8192, 1), _supervisor(std::make_unique<FRT_Supervisor>()), _domains(), _reqQ(), @@ -108,8 +110,8 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con domainDir >> domainName; if ( ! domainName.empty()) { try { - auto domain = make_shared<Domain>(domainName, dir(), _threadPool, _commitExecutor, - _sessionExecutor, cfg,_fileHeaderContext); + auto domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor, + _domainPartSize, _defaultCrcType,_fileHeaderContext); _domains[domain->name()] = domain; } catch (const std::exception & e) { LOG(warning, "Failed creating %s domain on startup. Exception = %s", domainName.c_str(), e.what()); @@ -126,17 +128,17 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con listenOk = true; } else { LOG(warning, "Failed listening at port %s trying for %d seconds more.", listenSpec, i); - std::this_thread::sleep_for(1s); + FastOS_Thread::Sleep(1000); } } if ( ! listenOk ) { - throw runtime_error(make_string("Failed listening at port %s. Giving up. Requires manual intervention.", listenSpec)); + throw std::runtime_error(make_string("Failed listening at port %s. Giving up. Requires manual intervention.", listenSpec)); } } else { - throw runtime_error(make_string("Failed creating tls dir %s r(%d), e(%d). Requires manual intervention.", dir().c_str(), retval, errno)); + throw std::runtime_error(make_string("Failed creating tls dir %s r(%d), e(%d). Requires manual intervention.", dir().c_str(), retval, errno)); } } else { - throw runtime_error(make_string("Failed creating tls base dir %s r(%d), e(%d). Requires manual intervention.", _baseDir.c_str(), retval, errno)); + throw std::runtime_error(make_string("Failed creating tls base dir %s r(%d), e(%d). Requires manual intervention.", _baseDir.c_str(), retval, errno)); } start(_threadPool); } @@ -152,16 +154,14 @@ TransLogServer::~TransLogServer() _supervisor->ShutDown(true); } -bool -TransLogServer::onStop() +bool TransLogServer::onStop() { LOG(info, "Stopping TLS"); _reqQ.push(NULL); return true; } -void -TransLogServer::run() +void TransLogServer::run() { FRT_RPCRequest *req(NULL); bool hasPacket(false); @@ -200,12 +200,11 @@ TransLogServer::run() } } logMetric(); - } while (running() && !(hasPacket && (req == nullptr))); + } while (running() && !(hasPacket && (req == NULL))); LOG(info, "TLS Stopped"); } -void -TransLogServer::logMetric() const +void TransLogServer::logMetric() const { Guard domainGuard(_lock); for (DomainList::const_iterator it(_domains.begin()), mt(_domains.end()); it != mt; it++) { @@ -216,17 +215,6 @@ TransLogServer::logMetric() const } } - -TransLogServer & -TransLogServer::setDomainConfig(const DomainConfig & cfg) { - Guard domainGuard(_lock); - _domainConfig = cfg; - for(auto &domain: _domains) { - domain.second->setConfig(cfg); - } - return *this; -} - DomainStats TransLogServer::getDomainStats() const { @@ -261,8 +249,7 @@ TransLogServer::findDomain(const stringref &domainName) return domain; } -void -TransLogServer::exportRPC(FRT_Supervisor & supervisor) +void TransLogServer::exportRPC(FRT_Supervisor & supervisor) { _supervisor->SetSessionInitHook(FRT_METHOD(TransLogServer::initSession), this); _supervisor->SetSessionFiniHook(FRT_METHOD(TransLogServer::finiSession), this); @@ -349,8 +336,7 @@ TransLogServer::exportRPC(FRT_Supervisor & supervisor) rb.ReturnDesc("syncedto", "Entry synced to"); } -void -TransLogServer::createDomain(FRT_RPCRequest *req) +void TransLogServer::createDomain(FRT_RPCRequest *req) { uint32_t retval(0); FRT_Values & params = *req->GetParams(); @@ -363,8 +349,8 @@ TransLogServer::createDomain(FRT_RPCRequest *req) Domain::SP domain(findDomain(domainName)); if ( !domain ) { try { - domain = make_shared<Domain>(domainName, dir(), _threadPool, _commitExecutor, - _sessionExecutor, _domainConfig, _fileHeaderContext); + domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor, + _domainPartSize, _defaultCrcType, _fileHeaderContext); { Guard domainGuard(_lock); _domains[domain->name()] = domain; @@ -380,8 +366,7 @@ TransLogServer::createDomain(FRT_RPCRequest *req) ret.AddInt32(retval); } -void -TransLogServer::deleteDomain(FRT_RPCRequest *req) +void TransLogServer::deleteDomain(FRT_RPCRequest *req) { uint32_t retval(0); vespalib::string msg("ok"); @@ -420,8 +405,7 @@ TransLogServer::deleteDomain(FRT_RPCRequest *req) ret.AddString(msg.c_str()); } -void -TransLogServer::openDomain(FRT_RPCRequest *req) +void TransLogServer::openDomain(FRT_RPCRequest *req) { uint32_t retval(0); FRT_Values & params = *req->GetParams(); @@ -438,8 +422,7 @@ TransLogServer::openDomain(FRT_RPCRequest *req) ret.AddInt32(retval); } -void -TransLogServer::listDomains(FRT_RPCRequest *req) +void TransLogServer::listDomains(FRT_RPCRequest *req) { FRT_Values & ret = *req->GetReturn(); LOG(debug, "listDomains()"); @@ -454,8 +437,7 @@ TransLogServer::listDomains(FRT_RPCRequest *req) ret.AddString(domains.c_str()); } -void -TransLogServer::domainStatus(FRT_RPCRequest *req) +void TransLogServer::domainStatus(FRT_RPCRequest *req) { FRT_Values & params = *req->GetParams(); FRT_Values & ret = *req->GetReturn(); @@ -475,20 +457,18 @@ TransLogServer::domainStatus(FRT_RPCRequest *req) } } -void -TransLogServer::commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) +void TransLogServer::commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) { (void) done; Domain::SP domain(findDomain(domainName)); if (domain) { - domain->commit(packet, std::move(done)); + domain->commit(packet); } else { throw IllegalArgumentException("Could not find domain " + domainName); } } -void -TransLogServer::domainCommit(FRT_RPCRequest *req) +void TransLogServer::domainCommit(FRT_RPCRequest *req) { FRT_Values & params = *req->GetParams(); FRT_Values & ret = *req->GetReturn(); @@ -498,9 +478,7 @@ TransLogServer::domainCommit(FRT_RPCRequest *req) if (domain) { Packet packet(params[1]._data._buf, params[1]._data._len); try { - vespalib::Gate gate; - domain->commit(packet, make_shared<GateCallback>(gate)); - gate.await(); + domain->commit(packet); ret.AddInt32(0); ret.AddString("ok"); } catch (const std::exception & e) { @@ -513,8 +491,7 @@ TransLogServer::domainCommit(FRT_RPCRequest *req) } } -void -TransLogServer::domainVisit(FRT_RPCRequest *req) +void TransLogServer::domainVisit(FRT_RPCRequest *req) { uint32_t retval(uint32_t(-1)); FRT_Values & params = *req->GetParams(); @@ -531,8 +508,7 @@ TransLogServer::domainVisit(FRT_RPCRequest *req) ret.AddInt32(retval); } -void -TransLogServer::domainSessionRun(FRT_RPCRequest *req) +void TransLogServer::domainSessionRun(FRT_RPCRequest *req) { uint32_t retval(uint32_t(-1)); FRT_Values & params = *req->GetParams(); @@ -548,15 +524,13 @@ TransLogServer::domainSessionRun(FRT_RPCRequest *req) ret.AddInt32(retval); } -void -TransLogServer::relayToThreadRPC(FRT_RPCRequest *req) +void TransLogServer::relayToThreadRPC(FRT_RPCRequest *req) { req->Detach(); _reqQ.push(req); } -void -TransLogServer::domainSessionClose(FRT_RPCRequest *req) +void TransLogServer::domainSessionClose(FRT_RPCRequest *req) { uint32_t retval(uint32_t(-1)); FRT_Values & params = *req->GetParams(); @@ -573,8 +547,7 @@ TransLogServer::domainSessionClose(FRT_RPCRequest *req) ret.AddInt32(retval); } -void -TransLogServer::domainPrune(FRT_RPCRequest *req) +void TransLogServer::domainPrune(FRT_RPCRequest *req) { uint32_t retval(uint32_t(-1)); FRT_Values & params = *req->GetParams(); diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index 83627d2d3e3..d78d3d39887 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -22,15 +22,16 @@ public: typedef std::shared_ptr<TransLogServer> SP; TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads); + const common::FileHeaderContext &fileHeaderContext, + uint64_t domainPartSize, size_t maxThreads, DomainPart::Crc defaultCrc); TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg); + const common::FileHeaderContext &fileHeaderContext, uint64_t domainPartSize); TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const common::FileHeaderContext &fileHeaderContext); ~TransLogServer() override; DomainStats getDomainStats() const; + void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) override; - TransLogServer & setDomainConfig(const DomainConfig & cfg); class Session { @@ -78,7 +79,8 @@ private: vespalib::string _name; vespalib::string _baseDir; - DomainConfig _domainConfig; + const uint64_t _domainPartSize; + const DomainPart::Crc _defaultCrcType; vespalib::ThreadStackExecutor _commitExecutor; vespalib::ThreadStackExecutor _sessionExecutor; FastOS_ThreadPool _threadPool; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp index de3d72331d6..ff4a402b438 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp @@ -7,17 +7,12 @@ LOG_SETUP(".translogserverapp"); using search::common::FileHeaderContext; -using namespace std::chrono_literals; namespace search::transactionlog { -using LockGuard = std::lock_guard<std::mutex>; -using std::make_unique; - TransLogServerApp::TransLogServerApp(const config::ConfigUri & tlsConfigUri, const FileHeaderContext & fileHeaderContext) - : _lock(), - _tls(), + : _tls(), _tlsConfig(), _tlsConfigFetcher(tlsConfigUri.getContext()), _fileHeaderContext(fileHeaderContext) @@ -28,58 +23,24 @@ TransLogServerApp::TransLogServerApp(const config::ConfigUri & tlsConfigUri, namespace { - -Encoding::Crc -getCrc(searchlib::TranslogserverConfig::Crcmethod type) +DomainPart::Crc getCrc(searchlib::TranslogserverConfig::Crcmethod crcType) { - switch (type) { + switch (crcType) { case searchlib::TranslogserverConfig::ccitt_crc32: - return Encoding::Crc::ccitt_crc32; + return DomainPart::ccitt_crc32; case searchlib::TranslogserverConfig::xxh64: - return Encoding::Crc::xxh64; - } - return Encoding::Crc::xxh64; -} - -Encoding::Compression -getCompression(searchlib::TranslogserverConfig::Compression::Type type) -{ - switch (type) { - case searchlib::TranslogserverConfig::Compression::NONE: - return Encoding::Compression::none; - case searchlib::TranslogserverConfig::Compression::LZ4: - return Encoding::Compression::lz4; - case searchlib::TranslogserverConfig::Compression::ZSTD: - return Encoding::Compression::zstd; + return DomainPart::xxh64; } - return Encoding::Compression::lz4; -} - -Encoding -getEncoding(const searchlib::TranslogserverConfig & cfg) -{ - return Encoding(getCrc(cfg.crcmethod), getCompression(cfg.compression.type)); -} - -DomainConfig -getDomainConfig(const searchlib::TranslogserverConfig & cfg) { - DomainConfig dcfg; - dcfg.setEncoding(getEncoding(cfg)) - .setCompressionLevel(cfg.compression.level) - .setPartSizeLimit(cfg.filesizemax) - .setChunkSizeLimit(cfg.chunk.sizelimit) - .setChunkAgeLimit(std::chrono::microseconds(int64_t(cfg.chunk.agelimit*1000000))); - return dcfg; + abort(); } } void TransLogServerApp::start() { - LockGuard guard(_lock); - auto c = _tlsConfig.get(); - _tls = make_unique<TransLogServer>(c->servername, c->listenport, c->basedir, _fileHeaderContext, - getDomainConfig(*c), c->maxthreads); + std::shared_ptr<searchlib::TranslogserverConfig> c = _tlsConfig.get(); + _tls.reset(new TransLogServer(c->servername, c->listenport, c->basedir, _fileHeaderContext, + c->filesizemax, c->maxthreads, getCrc(c->crcmethod))); } TransLogServerApp::~TransLogServerApp() @@ -90,12 +51,8 @@ TransLogServerApp::~TransLogServerApp() void TransLogServerApp::configure(std::unique_ptr<searchlib::TranslogserverConfig> cfg) { LOG(config, "configure Transaction Log Server %s at port %d", cfg->servername.c_str(), cfg->listenport); - LockGuard guard(_lock); _tlsConfig.set(cfg.release()); _tlsConfig.latch(); - if (_tls) { - _tls->setDomainConfig(getDomainConfig(*cfg)); - } } } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h index ea6d0158cec..35fa994d1e4 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h @@ -14,7 +14,6 @@ namespace search::transactionlog { class TransLogServerApp : public config::IFetcherCallback<searchlib::TranslogserverConfig> { private: - std::mutex _lock; TransLogServer::SP _tls; vespalib::PtrHolder<searchlib::TranslogserverConfig> _tlsConfig; config::ConfigFetcher _tlsConfigFetcher; |