diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-08-27 16:20:15 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-02 10:28:54 +0000 |
commit | aef40155b80cef617828af4895c0bec1c733b4e5 (patch) | |
tree | 0a6031f27acd486d4420621dfdde136b55530a37 | |
parent | d68713378b51b84a2d653bbb0cdbf00d78659473 (diff) |
Configure compression and chunk size
11 files changed, 544 insertions, 356 deletions
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index 18b3a5c5d8e..44058d48d1e 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -49,6 +49,7 @@ using search::index::schema::CollectionType; using search::index::schema::DataType; using vespalib::makeLambdaTask; using search::transactionlog::TransLogServer; +using search::transactionlog::DomainConfig; using storage::spi::PartitionId; using storage::spi::RemoveResult; using storage::spi::Result; @@ -459,7 +460,7 @@ struct FeedHandlerFixture FeedHandler handler; FeedHandlerFixture() : _fileHeaderContext(), - tls("mytls", 9016, "mytlsdir", _fileHeaderContext, 0x10000), + tls("mytls", 9016, "mytlsdir", _fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)), tlsSpec("tcp/localhost:9016"), sharedExecutor(1, 0x10000), writeService(sharedExecutor), diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index 0dced597917..3c7b412eeaf 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -5,7 +5,6 @@ #include <vespa/vespalib/objects/identifiable.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/fastos/file.h> -#include <map> #include <vespa/log/log.h> LOG_SETUP("translogclient_test"); @@ -14,9 +13,24 @@ using namespace search; using namespace transactionlog; using namespace document; using namespace vespalib; +using namespace std::chrono_literals; using search::index::DummyFileHeaderContext; -vespalib::string myhex(const void * b, size_t sz) +namespace { + +bool createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains=0); +TransLogClient::Session::UP openDomainTest(TransLogClient & tls, const vespalib::string & name); +bool fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name); +void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries); +void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize); +uint32_t countFiles(const vespalib::string &dir); +void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries); +bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name); +void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains); +void verifyDomain(const vespalib::string & name); + +vespalib::string +myhex(const void * b, size_t sz) { static const char * hextab="0123456789ABCDEF"; const unsigned char * c = static_cast<const unsigned char *>(b); @@ -29,35 +43,6 @@ vespalib::string myhex(const void * b, size_t sz) return s; } -class Test : public vespalib::TestApp -{ -public: - int Main() override; -private: - bool createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains=0); - TransLogClient::Session::UP openDomainTest(TransLogClient & tls, const vespalib::string & name); - bool fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name); - void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries); - void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize); - uint32_t countFiles(const vespalib::string &dir); - void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries); - bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name); - bool partialUpdateTest(); - bool testVisitOverGeneratedDomain(); - bool testRemove(); - void createAndFillDomain(const vespalib::string & name, DomainPart::Crc crcMethod, size_t preExistingDomains); - void verifyDomain(const vespalib::string & name); - void testCrcVersions(); - bool testVisitOverPreExistingDomain(); - void testMany(); - void testErase(); - void testSync(); - void testTruncateOnShortRead(); - void testTruncateOnVersionMismatch(); -}; - -TEST_APPHOOK(Test); - class CallBackTest : public TransLogClient::Visitor::Callback { private: @@ -75,7 +60,8 @@ public: bool _eof; }; -RPC::Result CallBackTest::receive(const Packet & p) +RPC::Result +CallBackTest::receive(const Packet & p) { nbostream_longlivedbuf h(p.getHandle().data(), p.getHandle().size()); LOG(info,"CallBackTest::receive (%zu, %zu, %zu)(%s)", h.rp(), h.size(), h.capacity(), myhex(h.peek(), h.size()).c_str()); @@ -101,7 +87,8 @@ public: size_t _value; }; -RPC::Result CallBackManyTest::receive(const Packet & p) +RPC::Result +CallBackManyTest::receive(const Packet & p) { nbostream_longlivedbuf h(p.getHandle().data(), p.getHandle().size()); for(;h.size() > 0; _count++, _value++) { @@ -133,7 +120,8 @@ public: }; -RPC::Result CallBackUpdate::receive(const Packet & packet) +RPC::Result +CallBackUpdate::receive(const Packet & packet) { nbostream_longlivedbuf h(packet.getHandle().data(), packet.getHandle().size()); while (h.size() > 0) { @@ -185,7 +173,8 @@ public: SerialNum _prevSerial; }; -RPC::Result CallBackStatsTest::receive(const Packet & p) +RPC::Result +CallBackStatsTest::receive(const Packet & p) { nbostream_longlivedbuf h(p.getHandle().data(), p.getHandle().size()); for(;h.size() > 0; ++_count) { @@ -221,67 +210,10 @@ public: IMPLEMENT_IDENTIFIABLE(TestIdentifiable, Identifiable); -bool Test::partialUpdateTest() -{ - bool retval(false); - DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); - TransLogClient tls("tcp/localhost:18377"); - - TransLogClient::Session::UP s1 = openDomainTest(tls, "test1"); - TransLogClient::Session & session = *s1; - - TestIdentifiable du; - - nbostream os; - os << du; - - vespalib::ConstBufferRef bb(os.data(), os.size()); - LOG(info, "DU : %s", myhex(bb.c_str(), bb.size()).c_str()); - Packet::Entry e(7, du.getClass().id(), bb); - Packet pa; - pa.add(e); - pa.close(); - ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().data(), pa.getHandle().size()))); - - CallBackUpdate ca; - TransLogClient::Visitor::UP visitor = tls.createVisitor("test1", ca); - ASSERT_TRUE(visitor.get()); - ASSERT_TRUE( visitor->visit(5, 7) ); - for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca._eof ); - ASSERT_TRUE( ca.map().size() == 1); - ASSERT_TRUE( ca.hasSerial(7) ); - - CallBackUpdate ca1; - TransLogClient::Visitor::UP visitor1 = tls.createVisitor("test1", ca1); - ASSERT_TRUE(visitor1.get()); - ASSERT_TRUE( visitor1->visit(4, 5) ); - for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca1._eof ); - ASSERT_TRUE( ca1.map().size() == 0); - - CallBackUpdate ca2; - TransLogClient::Visitor::UP visitor2 = tls.createVisitor("test1", ca2); - ASSERT_TRUE(visitor2.get()); - ASSERT_TRUE( visitor2->visit(5, 6) ); - for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca2._eof ); - ASSERT_TRUE( ca2.map().size() == 0); - - CallBackUpdate ca3; - TransLogClient::Visitor::UP visitor3 = tls.createVisitor("test1", ca3); - ASSERT_TRUE(visitor3.get()); - ASSERT_TRUE( visitor3->visit(5, 1000) ); - for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca3._eof ); - ASSERT_TRUE( ca3.map().size() == 1); - ASSERT_TRUE( ca3.hasSerial(7) ); - - return retval; -} +constexpr size_t DEFAULT_PACKET_SIZE = 0xf000; -bool Test::createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains) +bool +createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains) { bool retval(true); std::vector<vespalib::string> dir; @@ -298,43 +230,40 @@ bool Test::createDomainTest(TransLogClient & tls, const vespalib::string & name, return retval; } -TransLogClient::Session::UP Test::openDomainTest(TransLogClient & tls, const vespalib::string & name) +TransLogClient::Session::UP +openDomainTest(TransLogClient & tls, const vespalib::string & name) { TransLogClient::Session::UP s1 = tls.open(name); ASSERT_TRUE (s1.get() != NULL); return s1; } -bool Test::fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name) +bool +fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name) { bool retval(true); Packet::Entry e1(1, 1, vespalib::ConstBufferRef("Content in buffer A", 20)); Packet::Entry e2(2, 2, vespalib::ConstBufferRef("Content in buffer B", 20)); Packet::Entry e3(3, 1, vespalib::ConstBufferRef("Content in buffer C", 20)); - Packet a; - ASSERT_TRUE (a.add(e1)); - Packet b; - ASSERT_TRUE (b.add(e2)); - ASSERT_TRUE (b.add(e3)); - ASSERT_TRUE (!b.add(e1)); - a.close(); - b.close(); + Packet a(DEFAULT_PACKET_SIZE); + a.add(e1); + Packet b(DEFAULT_PACKET_SIZE); + b.add(e2); + b.add(e3); + EXPECT_FALSE(b.add(e1)); ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size()))); ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(b.getHandle().data(), b.getHandle().size()))); - try { - s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size())); - ASSERT_TRUE(false); - } catch (const std::exception & e) { - EXPECT_EQUAL(vespalib::string("commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3)."), e.what()); - } + EXPECT_EXCEPTION(s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size())), + std::runtime_error, + "commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3)."); EXPECT_EQUAL(a.size(), 1u); EXPECT_EQUAL(a.range().from(), 1u); EXPECT_EQUAL(a.range().to(), 1u); EXPECT_EQUAL(b.size(), 2u); EXPECT_EQUAL(b.range().from(), 2u); EXPECT_EQUAL(b.range().to(), 3u); - EXPECT_TRUE(a.merge(b)); + a.merge(b); EXPECT_EQUAL(a.size(), 3u); EXPECT_EQUAL(a.range().from(), 1u); EXPECT_EQUAL(a.range().to(), 3u); @@ -349,52 +278,82 @@ bool Test::fillDomainTest(TransLogClient::Session * s1, const vespalib::string & return retval; } -void Test::fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries) +void +fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries) { size_t value(0); for(size_t i=0; i < numPackets; i++) { - std::unique_ptr<Packet> p(new Packet()); + std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE)); for(size_t j=0; j < numEntries; j++, value++) { Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value))); - if ( ! p->add(e) ) { - p->close(); + p->add(e); + if (p->sizeBytes() > DEFAULT_PACKET_SIZE){ ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size()))); - p.reset(new Packet()); - ASSERT_TRUE(p->add(e)); + p.reset(new Packet(DEFAULT_PACKET_SIZE)); } } - p->close(); ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size()))); } } +using Counter = std::atomic<size_t>; + +class CountDone : public IDestructorCallback { +public: + CountDone(Counter & inFlight) : _inFlight(inFlight) { ++_inFlight; } + ~CountDone() override { --_inFlight; } +private: + Counter & _inFlight; +}; + +void +fillDomainTest(TransLogServer & s1, const vespalib::string & domain, size_t numPackets, size_t numEntries) +{ + size_t value(0); + Counter inFlight(0); + for(size_t i=0; i < numPackets; i++) { + std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE)); + for(size_t j=0; j < numEntries; j++, value++) { + Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value))); + p->add(e); + if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) { + s1.commit(domain, *p, std::make_shared<CountDone>(inFlight)); + p.reset(new Packet(DEFAULT_PACKET_SIZE)); + } + } + s1.commit(domain, *p, std::make_shared<CountDone>(inFlight)); + LOG(info, "Inflight %ld", inFlight.load()); + } + while (inFlight.load() != 0) { + std::this_thread::sleep_for(10ms); + LOG(info, "Waiting for inflight %ld to reach zero", inFlight.load()); + } + +} + void -Test::fillDomainTest(TransLogClient::Session * s1, - size_t numPackets, size_t numEntries, - size_t entrySize) +fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize) { size_t value(0); std::vector<char> entryBuffer(entrySize); for(size_t i=0; i < numPackets; i++) { - std::unique_ptr<Packet> p(new Packet()); + std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE)); for(size_t j=0; j < numEntries; j++, value++) { Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&entryBuffer[0], entryBuffer.size())); - if ( ! p->add(e) ) { - p->close(); + p->add(e); + if (p->sizeBytes() > DEFAULT_PACKET_SIZE){ ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size()))); - p.reset(new Packet()); - ASSERT_TRUE(p->add(e)); + p.reset(new Packet(DEFAULT_PACKET_SIZE)); } } - p->close(); ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size()))); } } uint32_t -Test::countFiles(const vespalib::string &dir) +countFiles(const vespalib::string &dir) { uint32_t res = 0; FastOS_DirectoryScan dirScan(dir.c_str()); @@ -408,10 +367,8 @@ Test::countFiles(const vespalib::string &dir) return res; } - void -Test::checkFilledDomainTest(const TransLogClient::Session::UP &s1, - size_t numEntries) +checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries) { SerialNum b(0), e(0); size_t c(0); @@ -421,8 +378,8 @@ Test::checkFilledDomainTest(const TransLogClient::Session::UP &s1, EXPECT_EQUAL(c, numEntries); } - -bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name) +bool +visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name) { bool retval(true); @@ -486,10 +443,31 @@ getMaxSessionRunTime(TransLogServer &tls, const vespalib::string &domain) return tls.getDomainStats()[domain].maxSessionRunTime.count(); } -bool Test::testVisitOverGeneratedDomain() +void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains) { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss("test13", 18377, ".", fileHeaderContext, + DomainConfig().setPartSizeLimit(0x1000000).setEncoding(encoding), 4); + TransLogClient tls("tcp/localhost:18377"); + + createDomainTest(tls, name, preExistingDomains); + TransLogClient::Session::UP s1 = openDomainTest(tls, name); + fillDomainTest(s1.get(), name); +} + +void verifyDomain(const vespalib::string & name) { + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test13", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); + TransLogClient tls("tcp/localhost:18377"); + TransLogClient::Session::UP s1 = openDomainTest(tls, name); + visitDomainTest(tls, s1.get(), name); +} + +} + +TEST("testVisitOverGeneratedDomain") { + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); vespalib::string name("test1"); @@ -501,42 +479,85 @@ bool Test::testVisitOverGeneratedDomain() double maxSessionRunTime = getMaxSessionRunTime(tlss, "test1"); LOG(info, "testVisitOverGeneratedDomain(): maxSessionRunTime=%f", maxSessionRunTime); EXPECT_GREATER(maxSessionRunTime, 0); - return true; } -void Test::createAndFillDomain(const vespalib::string & name, DomainPart::Crc crcMethod, size_t preExistingDomains) -{ +TEST("testVisitOverPreExistingDomain") { + // Depends on Test::testVisitOverGeneratedDomain() DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000, 4, crcMethod); + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); - createDomainTest(tls, name, preExistingDomains); + vespalib::string name("test1"); TransLogClient::Session::UP s1 = openDomainTest(tls, name); - fillDomainTest(s1.get(), name); + visitDomainTest(tls, s1.get(), name); } -void Test::verifyDomain(const vespalib::string & name) -{ +TEST("partialUpdateTest") { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); - TransLogClient::Session::UP s1 = openDomainTest(tls, name); - visitDomainTest(tls, s1.get(), name); + + TransLogClient::Session::UP s1 = openDomainTest(tls, "test1"); + TransLogClient::Session & session = *s1; + + TestIdentifiable du; + + nbostream os; + os << du; + + vespalib::ConstBufferRef bb(os.data(), os.size()); + LOG(info, "DU : %s", myhex(bb.c_str(), bb.size()).c_str()); + Packet::Entry e(7, du.getClass().id(), bb); + Packet pa(DEFAULT_PACKET_SIZE); + pa.add(e); + ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().data(), pa.getHandle().size()))); + + CallBackUpdate ca; + TransLogClient::Visitor::UP visitor = tls.createVisitor("test1", ca); + ASSERT_TRUE(visitor.get()); + ASSERT_TRUE( visitor->visit(5, 7) ); + for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } + ASSERT_TRUE( ca._eof ); + ASSERT_TRUE( ca.map().size() == 1); + ASSERT_TRUE( ca.hasSerial(7) ); + + CallBackUpdate ca1; + TransLogClient::Visitor::UP visitor1 = tls.createVisitor("test1", ca1); + ASSERT_TRUE(visitor1.get()); + ASSERT_TRUE( visitor1->visit(4, 5) ); + for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } + ASSERT_TRUE( ca1._eof ); + ASSERT_TRUE( ca1.map().size() == 0); + + CallBackUpdate ca2; + TransLogClient::Visitor::UP visitor2 = tls.createVisitor("test1", ca2); + ASSERT_TRUE(visitor2.get()); + ASSERT_TRUE( visitor2->visit(5, 6) ); + for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } + ASSERT_TRUE( ca2._eof ); + ASSERT_TRUE( ca2.map().size() == 0); + + CallBackUpdate ca3; + TransLogClient::Visitor::UP visitor3 = tls.createVisitor("test1", ca3); + ASSERT_TRUE(visitor3.get()); + ASSERT_TRUE( visitor3->visit(5, 1000) ); + for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } + ASSERT_TRUE( ca3._eof ); + ASSERT_TRUE( ca3.map().size() == 1); + ASSERT_TRUE( ca3.hasSerial(7) ); } -void Test::testCrcVersions() -{ - createAndFillDomain("ccitt_crc32", DomainPart::ccitt_crc32, 0); - createAndFillDomain("xxh64", DomainPart::xxh64, 1); +TEST("testCrcVersions") { + createAndFillDomain("ccitt_crc32", Encoding::Crc::ccitt_crc32, 0); + createAndFillDomain("xxh64", Encoding::Crc::xxh64, 1); verifyDomain("ccitt_crc32"); verifyDomain("xxh64"); } -bool Test::testRemove() -{ +TEST("testRemove") { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); vespalib::string name("test-delete"); @@ -545,21 +566,6 @@ bool Test::testRemove() fillDomainTest(s1.get(), name); visitDomainTest(tls, s1.get(), name); ASSERT_TRUE(tls.remove(name)); - - return true; -} - -bool Test::testVisitOverPreExistingDomain() -{ - // Depends on Test::testVisitOverGeneratedDomain() - DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); - TransLogClient tls("tcp/localhost:18377"); - - vespalib::string name("test1"); - TransLogClient::Session::UP s1 = openDomainTest(tls, name); - visitDomainTest(tls, s1.get(), name); - return true; } namespace { @@ -600,18 +606,19 @@ assertStatus(TransLogClient::Session &s, } -void Test::testMany() -{ +TEST("test sending a lot of data") { const unsigned int NUM_PACKETS = 1000; const unsigned int NUM_ENTRIES = 100; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; + const vespalib::string MANY("many"); { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x80000); + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000) + .setChunkAgeLimit(100us)); TransLogClient tls("tcp/localhost:18377"); - createDomainTest(tls, "many", 0); - TransLogClient::Session::UP s1 = openDomainTest(tls, "many"); + createDomainTest(tls, MANY, 0); + TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES); SerialNum b(0), e(0); size_t c(0); @@ -630,7 +637,7 @@ void Test::testMany() } { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, "many"); @@ -641,7 +648,28 @@ void Test::testMany() EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor("many", ca); + TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); + ASSERT_TRUE(visitor.get()); + ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); + for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } + ASSERT_TRUE( ca._eof ); + EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES); + EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES); + } + { + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); + TransLogClient tls("tcp/localhost:18377"); + + TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); + SerialNum b(0), e(0); + size_t c(0); + EXPECT_TRUE(s1->status(b, e, c)); + EXPECT_EQUAL(b, 1u); + EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); + EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); + CallBackManyTest ca(2); + TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -651,14 +679,67 @@ void Test::testMany() } } -void Test::testErase() -{ +TEST("test sending a lot of data async") { const unsigned int NUM_PACKETS = 1000; const unsigned int NUM_ENTRIES = 100; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; + const vespalib::string MANY("many-async"); + { + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000) + .setChunkAgeLimit(10ms)); + TransLogClient tls("tcp/localhost:18377"); + createDomainTest(tls, MANY, 1); + TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); + fillDomainTest(tlss, MANY, NUM_PACKETS, NUM_ENTRIES); + SerialNum b(0), e(0); + size_t c(0); + EXPECT_TRUE(s1->status(b, e, c)); + + EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); + EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); + CallBackManyTest ca(2); + TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); + ASSERT_TRUE(visitor.get()); + ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); + for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } + ASSERT_TRUE( ca._eof ); + EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES); + EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES); + } { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x80000); + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); + TransLogClient tls("tcp/localhost:18377"); + + TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); + SerialNum b(0), e(0); + size_t c(0); + EXPECT_TRUE(s1->status(b, e, c)); + EXPECT_EQUAL(b, 1u); + EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); + EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); + CallBackManyTest ca(2); + TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); + ASSERT_TRUE(visitor.get()); + ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); + for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } + ASSERT_TRUE( ca._eof ); + EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES); + EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES); + } +} + + + + +TEST("testErase") { + const unsigned int NUM_PACKETS = 1000; + const unsigned int NUM_ENTRIES = 100; + const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; + { + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "erase", 0); @@ -667,7 +748,7 @@ void Test::testErase() } { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, "erase"); @@ -748,16 +829,13 @@ void Test::testErase() } } - -void -Test::testSync() -{ +TEST("testSync") { const unsigned int NUM_PACKETS = 3; const unsigned int NUM_ENTRIES = 4; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test9", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test9", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "sync", 0); @@ -770,10 +848,7 @@ Test::testSync() EXPECT_EQUAL(syncedTo, TOTAL_NUM_ENTRIES); } - -void -Test::testTruncateOnVersionMismatch() -{ +TEST("test truncate on version mismatch") { const unsigned int NUM_PACKETS = 3; const unsigned int NUM_ENTRIES = 4; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; @@ -782,7 +857,7 @@ Test::testTruncateOnVersionMismatch() size_t countOld(0); DummyFileHeaderContext fileHeaderContext; { - TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "sync", 0); @@ -803,7 +878,7 @@ Test::testTruncateOnVersionMismatch() EXPECT_EQUAL(static_cast<ssize_t>(sizeof(tmp)), f.Write2(tmp, sizeof(tmp))); EXPECT_TRUE(f.Close()); { - TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, "sync"); uint64_t from(0), to(0); @@ -815,9 +890,7 @@ Test::testTruncateOnVersionMismatch() } } -void -Test::testTruncateOnShortRead() -{ +TEST("test truncation after short read") { const unsigned int NUM_PACKETS = 17; const unsigned int NUM_ENTRIES = 1; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; @@ -829,7 +902,7 @@ Test::testTruncateOnShortRead() DummyFileHeaderContext fileHeaderContext; { - TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls(tlsspec); createDomainTest(tls, domain, 0); @@ -845,7 +918,7 @@ Test::testTruncateOnShortRead() EXPECT_EQUAL(2u, countFiles(dir)); } { - TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls(tlsspec); TransLogClient::Session::UP s1 = openDomainTest(tls, domain); checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES); @@ -861,7 +934,7 @@ Test::testTruncateOnShortRead() trfile.Close(); } { - TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls(tlsspec); TransLogClient::Session::UP s1 = openDomainTest(tls, domain); checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES - 1); @@ -871,28 +944,4 @@ Test::testTruncateOnShortRead() } } - -int Test::Main() -{ - TEST_INIT("translogclient_test"); - - if (_argc > 0) { - DummyFileHeaderContext::setCreator(_argv[0]); - } - testVisitOverGeneratedDomain(); - testVisitOverPreExistingDomain(); - testMany(); - testErase(); - partialUpdateTest(); - - testRemove(); - - testSync(); - - testTruncateOnShortRead(); - testTruncateOnVersionMismatch(); - - testCrcVersions(); - - TEST_DONE(); -} +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp index 013ca81dcc9..925f297bf48 100644 --- a/searchlib/src/tests/transactionlogstress/translogstress.cpp +++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp @@ -698,7 +698,7 @@ TransLogStress::Main() // start transaction log server DummyFileHeaderContext fileHeaderContext; - TransLogServer tls("server", 17897, ".", fileHeaderContext, _cfg.domainPartSize); + TransLogServer tls("server", 17897, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(_cfg.domainPartSize)); TransLogClient client(tlsSpec); client.create(domain); diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def index 74efe3fe68e..f822fc80fc1 100644 --- a/searchlib/src/vespa/searchlib/config/translogserver.def +++ b/searchlib/src/vespa/searchlib/config/translogserver.def @@ -5,7 +5,7 @@ namespace=searchlib listenport int default=13700 restart ## Max file size (50M) -filesizemax int default=50000000 restart +filesizemax int default=50000000 ## Server name to identify server. servername string default="tls" restart @@ -22,3 +22,17 @@ maxthreads int default=4 restart ##Default crc method used crcmethod enum {ccitt_crc32, xxh64} default=xxh64 + +## Control compression type. +compression.type enum {NONE, NONE_MULTI, LZ4, ZSTD} default=LZ4 + +## Control compression level +## LZ4 has normal range 1..9 while ZSTD has range 1..19 +## 9 is a reasonable default for both +compression.level int default=9 + +## How large a chunk can grow in memory before beeing flushed +chunk.sizelimit int default = 256000 # 256k + +## How long a chunk can reside in memory befor ebeeing flushed to disk. +chunk.agelimit double default = 0.010 # 10 milliseconds diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 5e7cfc74199..81066891601 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -4,11 +4,14 @@ #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/io/fileutil.h> +#include <vespa/vespalib/util/lambdatask.h> #include <vespa/fastos/file.h> #include <algorithm> #include <thread> #include <vespa/log/log.h> +#include <vespa/vespalib/util/threadstackexecutor.h> + LOG_SETUP(".transactionlog.domain"); using vespalib::string; @@ -20,29 +23,43 @@ using vespalib::Monitor; using vespalib::MonitorGuard; using search::common::FileHeaderContext; using std::runtime_error; -using namespace std::chrono_literals; +using std::make_shared; namespace search::transactionlog { -Domain::Domain(const string &domainName, const string & baseDir, Executor & commitExecutor, - Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType, - const FileHeaderContext &fileHeaderContext) : - _defaultCrcType(defaultCrcType), - _commitExecutor(commitExecutor), - _sessionExecutor(sessionExecutor), - _sessionId(1), - _syncMonitor(), - _pendingSync(false), - _name(domainName), - _domainPartSize(domainPartSize), - _parts(), - _lock(), - _sessionLock(), - _sessions(), - _maxSessionRunTime(), - _baseDir(baseDir), - _fileHeaderContext(fileHeaderContext), - _markedDeleted(false) +VESPA_THREAD_STACK_TAG(domain_commit_executor); + +DomainConfig::DomainConfig() + : _encoding(Encoding::Crc::xxh64, Encoding::Compression::none), + _compressionLevel(9), + _partSizeLimit(0x10000000), // 256M + _chunkSizeLimit(0x40000), // 256k + _chunkAgeLimit(10ms) +{ } + +Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPool & threadPool, + Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, + const FileHeaderContext &fileHeaderContext) + : _config(cfg), + _lastSerial(0), + _threadPool(threadPool), + _singleCommiter(std::make_unique<vespalib::ThreadStackExecutor>(1, 128*1024)), + _commitExecutor(commitExecutor), + _sessionExecutor(sessionExecutor), + _sessionId(1), + _syncMonitor(), + _pendingSync(false), + _name(domainName), + _parts(), + _lock(), + _currentChunkMonitor(), + _sessionLock(), + _sessions(), + _maxSessionRunTime(), + _baseDir(baseDir), + _fileHeaderContext(fileHeaderContext), + _markedDeleted(false), + _self(nullptr) { int retval(0); if ((retval = makeDirectory(_baseDir.c_str())) != 0) { @@ -60,13 +77,22 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & comm } _sessionExecutor.sync(); if (_parts.empty() || _parts.crbegin()->second->isClosed()) { - _parts[lastPart] = std::make_shared<DomainPart>(_name, dir(), lastPart, _defaultCrcType, _fileHeaderContext, false); + _parts[lastPart] = std::make_shared<DomainPart>(_name, dir(), lastPart, _config.getEncoding(), + _config.getCompressionlevel(), _fileHeaderContext, false); vespalib::File::sync(dir()); } + _lastSerial = end(); +} + +Domain & +Domain::setConfig(const DomainConfig & cfg) { + _config = cfg; + return *this; } void Domain::addPart(int64_t partId, bool isLastPart) { - auto dp = std::make_shared<DomainPart>(_name, dir(), partId, _defaultCrcType, _fileHeaderContext, isLastPart); + auto dp = std::make_shared<DomainPart>(_name, dir(), partId, _config.getEncoding(), + _config.getCompressionlevel(), _fileHeaderContext, isLastPart); if (dp->size() == 0) { // Only last domain part is allowed to be truncated down to // empty size. @@ -285,18 +311,21 @@ waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync) } -void Domain::commit(const Packet & packet) +void +Domain::commit(const Packet & packet, Writer::DoneCallback onDone) { + (void) onDone; DomainPart::SP dp(_parts.rbegin()->second); vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size()); Packet::Entry entry; entry.deserialize(is); - if (dp->byteSize() > _domainPartSize) { + if (dp->byteSize() > _config.getPartSizeLimit()) { waitPendingSync(_syncMonitor, _pendingSync); triggerSyncNow(); waitPendingSync(_syncMonitor, _pendingSync); dp->close(); - dp = std::make_shared<DomainPart>(_name, dir(), entry.serial(), _defaultCrcType, _fileHeaderContext, false); + dp = std::make_shared<DomainPart>(_name, dir(), entry.serial(), _config.getPartSizeLimit(), + _config.getCompressionlevel(), _fileHeaderContext, false); { LockGuard guard(_lock); _parts[entry.serial()] = dp; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index d6f964d5140..cc7f4ac5e4b 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -4,20 +4,45 @@ #include "domainpart.h" #include "session.h" #include <vespa/vespalib/util/threadexecutor.h> +#include <vespa/vespalib/util/time.h> +#include <vespa/fastos/thread.h> #include <chrono> namespace search::transactionlog { +class DomainConfig { +public: + using duration = vespalib::duration; + DomainConfig(); + DomainConfig & setEncoding(Encoding v) { _encoding = v; return *this; } + DomainConfig & setPartSizeLimit(size_t v) { _partSizeLimit = v; return *this; } + DomainConfig & setChunkSizeLimit(size_t v) { _chunkSizeLimit = v; return *this; } + DomainConfig & setChunkAgeLimit(vespalib::duration v) { _chunkAgeLimit = v; return *this; } + DomainConfig & setCompressionLevel(uint8_t v) { _compressionLevel = v; return *this; } + Encoding getEncoding() const { return _encoding; } + size_t getPartSizeLimit() const { return _partSizeLimit; } + size_t getChunkSizeLimit() const { return _chunkSizeLimit; } + duration getChunkAgeLimit() const { return _chunkAgeLimit; } + uint8_t getCompressionlevel() const { return _compressionLevel; } +private: + Encoding _encoding; + uint8_t _compressionLevel; + size_t _partSizeLimit; + size_t _chunkSizeLimit; + duration _chunkAgeLimit; +}; + struct PartInfo { SerialNumRange range; size_t numEntries; size_t byteSize; vespalib::string file; - PartInfo(SerialNumRange range_in, size_t numEntries_in, - size_t byteSize_in, - vespalib::stringref file_in) - : range(range_in), numEntries(numEntries_in), byteSize(byteSize_in), - file(file_in) {} + PartInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, vespalib::stringref file_in) + : range(range_in), + numEntries(numEntries_in), + byteSize(byteSize_in), + file(file_in) + {} }; struct DomainInfo { @@ -40,17 +65,17 @@ class Domain public: using SP = std::shared_ptr<Domain>; using Executor = vespalib::SyncableThreadExecutor; - Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & commitExecutor, - Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType, + Domain(const vespalib::string &name, const vespalib::string &baseDir, FastOS_ThreadPool & threadPool, + Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, const common::FileHeaderContext &fileHeaderContext); - virtual ~Domain(); + ~Domain(); DomainInfo getDomainInfo() const; const vespalib::string & name() const { return _name; } bool erase(SerialNum to); - void commit(const Packet & packet); + void commit(const Packet & packet, Writer::DoneCallback onDone); int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr<Session::Destination> dest); SerialNum begin() const; @@ -77,7 +102,7 @@ public: return _sessionExecutor.execute(std::move(task)); } uint64_t size() const; - + Domain & setConfig(const DomainConfig & cfg); private: SerialNum begin(const vespalib::LockGuard & guard) const; SerialNum end(const vespalib::LockGuard & guard) const; @@ -95,22 +120,26 @@ private: using DomainPartList = std::map<int64_t, DomainPart::SP>; using DurationSeconds = std::chrono::duration<double>; - DomainPart::Crc _defaultCrcType; - Executor & _commitExecutor; - Executor & _sessionExecutor; - std::atomic<int> _sessionId; - vespalib::Monitor _syncMonitor; - bool _pendingSync; - vespalib::string _name; - uint64_t _domainPartSize; - DomainPartList _parts; - vespalib::Lock _lock; - vespalib::Lock _sessionLock; - SessionList _sessions; - DurationSeconds _maxSessionRunTime; - vespalib::string _baseDir; + DomainConfig _config; + SerialNum _lastSerial; + FastOS_ThreadPool & _threadPool; + std::unique_ptr<Executor> _singleCommiter; + Executor & _commitExecutor; + Executor & _sessionExecutor; + std::atomic<int> _sessionId; + vespalib::Monitor _syncMonitor; + bool _pendingSync; + vespalib::string _name; + DomainPartList _parts; + vespalib::Lock _lock; + vespalib::Monitor _currentChunkMonitor; + vespalib::Lock _sessionLock; + SessionList _sessions; + DurationSeconds _maxSessionRunTime; + vespalib::string _baseDir; const common::FileHeaderContext &_fileHeaderContext; - bool _markedDeleted; + bool _markedDeleted; + FastOS_ThreadInterface * _self; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp index 91599f8218a..6bd6d706d0a 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp @@ -27,6 +27,11 @@ namespace search::transactionlog { namespace { +enum Crc { + ccitt_crc32=1, + xxh64=2 +}; + void handleSync(FastOS_FileInterface &file) __attribute__ ((noinline)); @@ -176,6 +181,20 @@ handleReadError(const char *text, return retval; } +int32_t +calcCrc(Crc version, const void * buf, size_t sz) +{ + if (version == xxh64) { + return static_cast<int32_t>(XXH64(buf, sz, 0ll)); + } else if (version == ccitt_crc32) { + vespalib::crc_32_type calculator; + calculator.process_bytes(buf, sz); + return calculator.checksum(); + } else { + LOG_ABORT("should not be reached"); + } +} + } int64_t @@ -265,22 +284,23 @@ DomainPart::buildPacketMapping(bool allowTruncate) return currPos; } -DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Crc defaultCrc, - const FileHeaderContext &fileHeaderContext, bool allowTruncate) : - _defaultCrc(defaultCrc), - _lock(), - _fileLock(), - _range(s), - _sz(0), - _byteSize(0), - _packets(), - _fileName(make_string("%s/%s-%016" PRIu64, baseDir.c_str(), name.c_str(), s)), - _transLog(std::make_unique<FastOS_File>(_fileName.c_str())), - _skipList(), - _headerLen(0), - _writeLock(), - _writtenSerial(0), - _syncedSerial(0) +DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding, + uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate) + : _encoding(encoding), + _compressionLevel(compressionLevel), + _lock(), + _fileLock(), + _range(s), + _sz(0), + _byteSize(0), + _packets(), + _fileName(make_string("%s/%s-%016" PRIu64, baseDir.c_str(), name.c_str(), s)), + _transLog(std::make_unique<FastOS_File>(_fileName.c_str())), + _skipList(), + _headerLen(0), + _writeLock(), + _writtenSerial(0), + _syncedSerial(0) { if (_transLog->OpenReadOnly()) { int64_t currPos = buildPacketMapping(allowTruncate); @@ -582,12 +602,12 @@ DomainPart::write(FastOS_FileInterface &file, const Packet::Entry &entry) int32_t crc(0); uint32_t len(entry.serializedSize() + sizeof(crc)); nbostream os; - os << static_cast<uint8_t>(_defaultCrc); + os << static_cast<uint8_t>(Crc::xxh64); os << len; size_t start(os.size()); entry.serialize(os); size_t end(os.size()); - crc = calcCrc(_defaultCrc, os.data() + start, end - start); + crc = calcCrc(Crc::xxh64, os.data() + start, end - start); os << crc; size_t osSize = os.size(); assert(osSize == len + sizeof(len) + sizeof(uint8_t)); @@ -655,17 +675,4 @@ DomainPart::read(FastOS_FileInterface &file, return retval; } -int32_t DomainPart::calcCrc(Crc version, const void * buf, size_t sz) -{ - if (version == xxh64) { - return static_cast<int32_t>(XXH64(buf, sz, 0ll)); - } else if (version == ccitt_crc32) { - vespalib::crc_32_type calculator; - calculator.process_bytes(buf, sz); - return calculator.checksum(); - } else { - LOG_ABORT("should not be reached"); - } -} - } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h index 59d0df6df94..f3a53c1e9a9 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h @@ -2,6 +2,7 @@ #pragma once #include "common.h" +#include "ichunk.h" #include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/memory.h> #include <map> @@ -19,13 +20,9 @@ private: DomainPart& operator=(const DomainPart &); public: - enum Crc { - ccitt_crc32=1, - xxh64=2 - }; typedef std::shared_ptr<DomainPart> SP; - DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Crc defaultCrc, - const common::FileHeaderContext &FileHeaderContext, bool allowTruncate); + DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Encoding defaultEncoding, + uint8_t compressionLevel, const common::FileHeaderContext &FileHeaderContext, bool allowTruncate); ~DomainPart(); @@ -55,7 +52,6 @@ private: static bool read(FastOS_FileInterface &file, Packet::Entry &entry, vespalib::alloc::Alloc &buf, bool allowTruncate); void write(FastOS_FileInterface &file, const Packet::Entry &entry); - static int32_t calcCrc(Crc crc, const void * buf, size_t len); void writeHeader(const common::FileHeaderContext &fileHeaderContext); class SkipInfo @@ -77,21 +73,22 @@ private: }; typedef std::vector<SkipInfo> SkipList; typedef std::map<SerialNum, Packet> PacketList; - const Crc _defaultCrc; - vespalib::Lock _lock; - vespalib::Lock _fileLock; - SerialNumRange _range; - size_t _sz; + const Encoding _encoding; + const uint8_t _compressionLevel; + vespalib::Lock _lock; + vespalib::Lock _fileLock; + SerialNumRange _range; + size_t _sz; std::atomic<uint64_t> _byteSize; - PacketList _packets; - vespalib::string _fileName; + PacketList _packets; + vespalib::string _fileName; std::unique_ptr<FastOS_FileInterface> _transLog; - SkipList _skipList; - uint32_t _headerLen; - vespalib::Lock _writeLock; + SkipList _skipList; + uint32_t _headerLen; + vespalib::Lock _writeLock; // Protected by _writeLock - SerialNum _writtenSerial; - SerialNum _syncedSerial; + SerialNum _writtenSerial; + SerialNum _syncedSerial; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index caef792704a..b98453a7648 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -1,5 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "translogserver.h" +#include <vespa/searchlib/common/gatecallback.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/time.h> @@ -78,25 +79,25 @@ SyncHandler::PerformTask() TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const FileHeaderContext &fileHeaderContext) - : TransLogServer(name, listenPort, baseDir, fileHeaderContext, 0x10000000) + : TransLogServer(name, listenPort, baseDir, fileHeaderContext, + DomainConfig().setEncoding(Encoding(Encoding::xxh64, Encoding::Compression::none)) + .setPartSizeLimit(0x10000000).setChunkSizeLimit(0x40000).setChunkAgeLimit( 100us)) {} TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize) - : TransLogServer(name, listenPort, baseDir, fileHeaderContext, domainPartSize, 4, DomainPart::Crc::xxh64) + const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg) + : TransLogServer(name, listenPort, baseDir, fileHeaderContext, cfg, 4) {} TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize, - size_t maxThreads, DomainPart::Crc defaultCrcType) + const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads) : FRT_Invokable(), _name(name), _baseDir(baseDir), - _domainPartSize(domainPartSize), - _defaultCrcType(defaultCrcType), + _domainConfig(cfg), _commitExecutor(maxThreads, 128*1024), _sessionExecutor(maxThreads, 128*1024), - _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)), + _threadPool(std::make_unique<FastOS_ThreadPool>(1024*120)), _transport(std::make_unique<FNET_Transport>()), _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())), _domains(), @@ -112,8 +113,8 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con domainDir >> domainName; if ( ! domainName.empty()) { try { - auto domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor, - _domainPartSize, _defaultCrcType,_fileHeaderContext); + auto domain = make_shared<Domain>(domainName, dir(), *_threadPool, _commitExecutor, + _sessionExecutor, cfg, _fileHeaderContext); _domains[domain->name()] = domain; } catch (const std::exception & e) { LOG(warning, "Failed creating %s domain on startup. Exception = %s", domainName.c_str(), e.what()); @@ -202,10 +203,21 @@ TransLogServer::run() req->Return(); } } - } while (running() && !(hasPacket && (req == NULL))); + } while (running() && !(hasPacket && (req == nullptr))); LOG(info, "TLS Stopped"); } + +TransLogServer & +TransLogServer::setDomainConfig(const DomainConfig & cfg) { + Guard domainGuard(_lock); + _domainConfig = cfg; + for(auto &domain: _domains) { + domain.second->setConfig(cfg); + } + return *this; +} + DomainStats TransLogServer::getDomainStats() const { @@ -434,8 +446,8 @@ TransLogServer::createDomain(FRT_RPCRequest *req) Domain::SP domain(findDomain(domainName)); if ( !domain ) { try { - domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor, - _domainPartSize, _defaultCrcType, _fileHeaderContext); + domain = std::make_shared<Domain>(domainName, dir(), *_threadPool, _commitExecutor, + _sessionExecutor, _domainConfig, _fileHeaderContext); Guard domainGuard(_lock); _domains[domain->name()] = domain; writeDomainDir(domainGuard, dir(), domainList(), _domains); @@ -544,10 +556,9 @@ TransLogServer::domainStatus(FRT_RPCRequest *req) void TransLogServer::commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) { - (void) done; Domain::SP domain(findDomain(domainName)); if (domain) { - domain->commit(packet); + domain->commit(packet, std::move(done)); } else { throw IllegalArgumentException("Could not find domain " + domainName); } @@ -564,7 +575,9 @@ TransLogServer::domainCommit(FRT_RPCRequest *req) if (domain) { Packet packet(params[1]._data._buf, params[1]._data._len); try { - domain->commit(packet); + vespalib::Gate gate; + domain->commit(packet, make_shared<GateCallback>(gate)); + gate.await(); ret.AddInt32(0); ret.AddString("ok"); } catch (const std::exception & e) { diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index 0d65f36e07d..3f945977386 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -26,16 +26,15 @@ public: typedef std::shared_ptr<TransLogServer> SP; TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const common::FileHeaderContext &fileHeaderContext, - uint64_t domainPartSize, size_t maxThreads, DomainPart::Crc defaultCrc); + const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads); TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const common::FileHeaderContext &fileHeaderContext, uint64_t domainPartSize); + const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg); TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const common::FileHeaderContext &fileHeaderContext); ~TransLogServer() override; DomainStats getDomainStats() const; - void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) override; + TransLogServer & setDomainConfig(const DomainConfig & cfg); class Session { @@ -82,8 +81,7 @@ private: vespalib::string _name; vespalib::string _baseDir; - const uint64_t _domainPartSize; - const DomainPart::Crc _defaultCrcType; + DomainConfig _domainConfig; vespalib::ThreadStackExecutor _commitExecutor; vespalib::ThreadStackExecutor _sessionExecutor; std::unique_ptr<FastOS_ThreadPool> _threadPool; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp index b8d21fb7465..2dedd9131ae 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp @@ -2,6 +2,7 @@ #include "translogserverapp.h" #include <vespa/config/subscription/configuri.h> +#include <vespa/vespalib/util/time.h> #include <vespa/log/log.h> LOG_SETUP(".translogserverapp"); @@ -24,16 +25,59 @@ TransLogServerApp::TransLogServerApp(const config::ConfigUri & tlsConfigUri, namespace { -DomainPart::Crc +Encoding::Crc getCrc(searchlib::TranslogserverConfig::Crcmethod crcType) { switch (crcType) { case searchlib::TranslogserverConfig::Crcmethod::ccitt_crc32: - return DomainPart::ccitt_crc32; + return Encoding::Crc::ccitt_crc32; case searchlib::TranslogserverConfig::Crcmethod::xxh64: - return DomainPart::xxh64; + return Encoding::Crc ::xxh64; } - LOG_ABORT("should not be reached"); + assert(false); +} + +Encoding::Compression +getCompression(searchlib::TranslogserverConfig::Compression::Type type) +{ + switch (type) { + case searchlib::TranslogserverConfig::Compression::Type::NONE: + return Encoding::Compression::none; + case searchlib::TranslogserverConfig::Compression::Type::NONE_MULTI: + return Encoding::Compression::none_multi; + case searchlib::TranslogserverConfig::Compression::Type::LZ4: + return Encoding::Compression::lz4; + case searchlib::TranslogserverConfig::Compression::Type::ZSTD: + return Encoding::Compression::zstd; + } + assert(false); +} + +Encoding +getEncoding(const searchlib::TranslogserverConfig & cfg) +{ + return Encoding(getCrc(cfg.crcmethod), getCompression(cfg.compression.type)); +} + +DomainConfig +getDomainConfig(const searchlib::TranslogserverConfig & cfg) { + DomainConfig dcfg; + dcfg.setEncoding(getEncoding(cfg)) + .setCompressionLevel(cfg.compression.level) + .setPartSizeLimit(cfg.filesizemax) + .setChunkSizeLimit(cfg.chunk.sizelimit) + .setChunkAgeLimit(vespalib::from_s(cfg.chunk.agelimit)); + return dcfg; +} + +void +logReconfig(const searchlib::TranslogserverConfig & cfg, const DomainConfig & dcfg) { + LOG(config, "configure Transaction Log Server %s at port %d\n" + "DomainConfig {encoding={%d, %d}, compression_level=%d, part_limit=%ld, chunk_limit=%ld age=%1.4f}", + cfg.servername.c_str(), cfg.listenport, + dcfg.getEncoding().getCrc(), dcfg.getEncoding().getCompression(), dcfg.getCompressionlevel(), + dcfg.getPartSizeLimit(), dcfg.getChunkSizeLimit(), vespalib::to_s(dcfg.getChunkAgeLimit()) + ); } } @@ -41,11 +85,12 @@ getCrc(searchlib::TranslogserverConfig::Crcmethod crcType) void TransLogServerApp::start() { - std::shared_ptr<searchlib::TranslogserverConfig> c = _tlsConfig.get(); - auto tls = std::make_shared<TransLogServer>(c->servername, c->listenport, c->basedir, _fileHeaderContext, - c->filesizemax, c->maxthreads, getCrc(c->crcmethod)); std::lock_guard<std::mutex> guard(_lock); - _tls = std::move(tls); + auto c = _tlsConfig.get(); + DomainConfig domainConfig = getDomainConfig(*c); + logReconfig(*c, domainConfig); + _tls = std::make_shared<TransLogServer>(c->servername, c->listenport, c->basedir, _fileHeaderContext, + domainConfig, c->maxthreads); } TransLogServerApp::~TransLogServerApp() @@ -56,9 +101,15 @@ TransLogServerApp::~TransLogServerApp() void TransLogServerApp::configure(std::unique_ptr<searchlib::TranslogserverConfig> cfg) { - LOG(config, "configure Transaction Log Server %s at port %d", cfg->servername.c_str(), cfg->listenport); + + std::lock_guard<std::mutex> guard(_lock); + DomainConfig dcfg = getDomainConfig(*cfg); + logReconfig(*cfg, dcfg); _tlsConfig.set(cfg.release()); _tlsConfig.latch(); + if (_tls) { + _tls->setDomainConfig(dcfg); + } } TransLogServer::SP |