summaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorArnstein Ressem <aressem@gmail.com>2020-09-02 20:54:35 +0200
committerGitHub <noreply@github.com>2020-09-02 20:54:35 +0200
commit8301699fbc42ac165c9e1ab09f3dbbc680181821 (patch)
tree0a66b82ddb9a099568b54f26cdf9b374fd9ad26a /searchlib
parent51f266785a4d6f1b3ac3e88ac897adae2ab94459 (diff)
Revert "Configure compression and chunk size"
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/tests/transactionlog/translogclient_test.cpp493
-rw-r--r--searchlib/src/tests/transactionlogstress/translogstress.cpp2
-rw-r--r--searchlib/src/vespa/searchlib/config/translogserver.def16
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp79
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h79
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp72
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.h35
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/ichunk.h2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/session.h2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp45
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.h10
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp69
12 files changed, 361 insertions, 543 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp
index d003bad0582..0dced597917 100644
--- a/searchlib/src/tests/transactionlog/translogclient_test.cpp
+++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp
@@ -5,6 +5,7 @@
#include <vespa/vespalib/objects/identifiable.h>
#include <vespa/searchlib/index/dummyfileheadercontext.h>
#include <vespa/fastos/file.h>
+#include <map>
#include <vespa/log/log.h>
LOG_SETUP("translogclient_test");
@@ -13,24 +14,9 @@ using namespace search;
using namespace transactionlog;
using namespace document;
using namespace vespalib;
-using namespace std::chrono_literals;
using search::index::DummyFileHeaderContext;
-namespace {
-
-bool createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains=0);
-TransLogClient::Session::UP openDomainTest(TransLogClient & tls, const vespalib::string & name);
-bool fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name);
-void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries);
-void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize);
-uint32_t countFiles(const vespalib::string &dir);
-void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries);
-bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name);
-void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains);
-void verifyDomain(const vespalib::string & name);
-
-vespalib::string
-myhex(const void * b, size_t sz)
+vespalib::string myhex(const void * b, size_t sz)
{
static const char * hextab="0123456789ABCDEF";
const unsigned char * c = static_cast<const unsigned char *>(b);
@@ -43,6 +29,35 @@ myhex(const void * b, size_t sz)
return s;
}
+class Test : public vespalib::TestApp
+{
+public:
+ int Main() override;
+private:
+ bool createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains=0);
+ TransLogClient::Session::UP openDomainTest(TransLogClient & tls, const vespalib::string & name);
+ bool fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name);
+ void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries);
+ void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize);
+ uint32_t countFiles(const vespalib::string &dir);
+ void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries);
+ bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name);
+ bool partialUpdateTest();
+ bool testVisitOverGeneratedDomain();
+ bool testRemove();
+ void createAndFillDomain(const vespalib::string & name, DomainPart::Crc crcMethod, size_t preExistingDomains);
+ void verifyDomain(const vespalib::string & name);
+ void testCrcVersions();
+ bool testVisitOverPreExistingDomain();
+ void testMany();
+ void testErase();
+ void testSync();
+ void testTruncateOnShortRead();
+ void testTruncateOnVersionMismatch();
+};
+
+TEST_APPHOOK(Test);
+
class CallBackTest : public TransLogClient::Visitor::Callback
{
private:
@@ -60,8 +75,7 @@ public:
bool _eof;
};
-RPC::Result
-CallBackTest::receive(const Packet & p)
+RPC::Result CallBackTest::receive(const Packet & p)
{
nbostream_longlivedbuf h(p.getHandle().data(), p.getHandle().size());
LOG(info,"CallBackTest::receive (%zu, %zu, %zu)(%s)", h.rp(), h.size(), h.capacity(), myhex(h.peek(), h.size()).c_str());
@@ -87,8 +101,7 @@ public:
size_t _value;
};
-RPC::Result
-CallBackManyTest::receive(const Packet & p)
+RPC::Result CallBackManyTest::receive(const Packet & p)
{
nbostream_longlivedbuf h(p.getHandle().data(), p.getHandle().size());
for(;h.size() > 0; _count++, _value++) {
@@ -120,8 +133,7 @@ public:
};
-RPC::Result
-CallBackUpdate::receive(const Packet & packet)
+RPC::Result CallBackUpdate::receive(const Packet & packet)
{
nbostream_longlivedbuf h(packet.getHandle().data(), packet.getHandle().size());
while (h.size() > 0) {
@@ -173,8 +185,7 @@ public:
SerialNum _prevSerial;
};
-RPC::Result
-CallBackStatsTest::receive(const Packet & p)
+RPC::Result CallBackStatsTest::receive(const Packet & p)
{
nbostream_longlivedbuf h(p.getHandle().data(), p.getHandle().size());
for(;h.size() > 0; ++_count) {
@@ -210,10 +221,67 @@ public:
IMPLEMENT_IDENTIFIABLE(TestIdentifiable, Identifiable);
-constexpr size_t DEFAULT_PACKET_SIZE = 0xf000;
+bool Test::partialUpdateTest()
+{
+ bool retval(false);
+ DummyFileHeaderContext fileHeaderContext;
+ TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000);
+ TransLogClient tls("tcp/localhost:18377");
+
+ TransLogClient::Session::UP s1 = openDomainTest(tls, "test1");
+ TransLogClient::Session & session = *s1;
+
+ TestIdentifiable du;
+
+ nbostream os;
+ os << du;
+
+ vespalib::ConstBufferRef bb(os.data(), os.size());
+ LOG(info, "DU : %s", myhex(bb.c_str(), bb.size()).c_str());
+ Packet::Entry e(7, du.getClass().id(), bb);
+ Packet pa;
+ pa.add(e);
+ pa.close();
+ ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().data(), pa.getHandle().size())));
+
+ CallBackUpdate ca;
+ TransLogClient::Visitor::UP visitor = tls.createVisitor("test1", ca);
+ ASSERT_TRUE(visitor.get());
+ ASSERT_TRUE( visitor->visit(5, 7) );
+ for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
+ ASSERT_TRUE( ca._eof );
+ ASSERT_TRUE( ca.map().size() == 1);
+ ASSERT_TRUE( ca.hasSerial(7) );
+
+ CallBackUpdate ca1;
+ TransLogClient::Visitor::UP visitor1 = tls.createVisitor("test1", ca1);
+ ASSERT_TRUE(visitor1.get());
+ ASSERT_TRUE( visitor1->visit(4, 5) );
+ for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
+ ASSERT_TRUE( ca1._eof );
+ ASSERT_TRUE( ca1.map().size() == 0);
+
+ CallBackUpdate ca2;
+ TransLogClient::Visitor::UP visitor2 = tls.createVisitor("test1", ca2);
+ ASSERT_TRUE(visitor2.get());
+ ASSERT_TRUE( visitor2->visit(5, 6) );
+ for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
+ ASSERT_TRUE( ca2._eof );
+ ASSERT_TRUE( ca2.map().size() == 0);
+
+ CallBackUpdate ca3;
+ TransLogClient::Visitor::UP visitor3 = tls.createVisitor("test1", ca3);
+ ASSERT_TRUE(visitor3.get());
+ ASSERT_TRUE( visitor3->visit(5, 1000) );
+ for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
+ ASSERT_TRUE( ca3._eof );
+ ASSERT_TRUE( ca3.map().size() == 1);
+ ASSERT_TRUE( ca3.hasSerial(7) );
+
+ return retval;
+}
-bool
-createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains)
+bool Test::createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains)
{
bool retval(true);
std::vector<vespalib::string> dir;
@@ -230,40 +298,43 @@ createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t pre
return retval;
}
-TransLogClient::Session::UP
-openDomainTest(TransLogClient & tls, const vespalib::string & name)
+TransLogClient::Session::UP Test::openDomainTest(TransLogClient & tls, const vespalib::string & name)
{
TransLogClient::Session::UP s1 = tls.open(name);
ASSERT_TRUE (s1.get() != NULL);
return s1;
}
-bool
-fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name)
+bool Test::fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name)
{
bool retval(true);
Packet::Entry e1(1, 1, vespalib::ConstBufferRef("Content in buffer A", 20));
Packet::Entry e2(2, 2, vespalib::ConstBufferRef("Content in buffer B", 20));
Packet::Entry e3(3, 1, vespalib::ConstBufferRef("Content in buffer C", 20));
- Packet a(DEFAULT_PACKET_SIZE);
- a.add(e1);
- Packet b(DEFAULT_PACKET_SIZE);
- b.add(e2);
- b.add(e3);
- EXPECT_FALSE(b.add(e1));
+ Packet a;
+ ASSERT_TRUE (a.add(e1));
+ Packet b;
+ ASSERT_TRUE (b.add(e2));
+ ASSERT_TRUE (b.add(e3));
+ ASSERT_TRUE (!b.add(e1));
+ a.close();
+ b.close();
ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size())));
ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(b.getHandle().data(), b.getHandle().size())));
- EXPECT_EXCEPTION(s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size())),
- std::runtime_error,
- "commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3).");
+ try {
+ s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size()));
+ ASSERT_TRUE(false);
+ } catch (const std::exception & e) {
+ EXPECT_EQUAL(vespalib::string("commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3)."), e.what());
+ }
EXPECT_EQUAL(a.size(), 1u);
EXPECT_EQUAL(a.range().from(), 1u);
EXPECT_EQUAL(a.range().to(), 1u);
EXPECT_EQUAL(b.size(), 2u);
EXPECT_EQUAL(b.range().from(), 2u);
EXPECT_EQUAL(b.range().to(), 3u);
- a.merge(b);
+ EXPECT_TRUE(a.merge(b));
EXPECT_EQUAL(a.size(), 3u);
EXPECT_EQUAL(a.range().from(), 1u);
EXPECT_EQUAL(a.range().to(), 3u);
@@ -278,82 +349,52 @@ fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name)
return retval;
}
-void
-fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries)
+void Test::fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries)
{
size_t value(0);
for(size_t i=0; i < numPackets; i++) {
- std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE));
+ std::unique_ptr<Packet> p(new Packet());
for(size_t j=0; j < numEntries; j++, value++) {
Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value)));
- p->add(e);
- if (p->sizeBytes() > DEFAULT_PACKET_SIZE){
+ if ( ! p->add(e) ) {
+ p->close();
ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size())));
- p.reset(new Packet(DEFAULT_PACKET_SIZE));
+ p.reset(new Packet());
+ ASSERT_TRUE(p->add(e));
}
}
+ p->close();
ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size())));
}
}
-using Counter = std::atomic<size_t>;
-
-class CountDone : public IDestructorCallback {
-public:
- CountDone(Counter & inFlight) : _inFlight(inFlight) { ++_inFlight; }
- ~CountDone() override { --_inFlight; }
-private:
- Counter & _inFlight;
-};
-
-void
-fillDomainTest(TransLogServer & s1, const vespalib::string & domain, size_t numPackets, size_t numEntries)
-{
- size_t value(0);
- Counter inFlight(0);
- for(size_t i=0; i < numPackets; i++) {
- std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE));
- for(size_t j=0; j < numEntries; j++, value++) {
- Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value)));
- p->add(e);
- if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) {
- s1.commit(domain, *p, std::make_shared<CountDone>(inFlight));
- p.reset(new Packet(DEFAULT_PACKET_SIZE));
- }
- }
- s1.commit(domain, *p, std::make_shared<CountDone>(inFlight));
- LOG(info, "Inflight %ld", inFlight.load());
- }
- while (inFlight.load() != 0) {
- std::this_thread::sleep_for(10ms);
- LOG(info, "Waiting for inflight %ld to reach zero", inFlight.load());
- }
-
-}
-
void
-fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize)
+Test::fillDomainTest(TransLogClient::Session * s1,
+ size_t numPackets, size_t numEntries,
+ size_t entrySize)
{
size_t value(0);
std::vector<char> entryBuffer(entrySize);
for(size_t i=0; i < numPackets; i++) {
- std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE));
+ std::unique_ptr<Packet> p(new Packet());
for(size_t j=0; j < numEntries; j++, value++) {
Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&entryBuffer[0], entryBuffer.size()));
- p->add(e);
- if (p->sizeBytes() > DEFAULT_PACKET_SIZE){
+ if ( ! p->add(e) ) {
+ p->close();
ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size())));
- p.reset(new Packet(DEFAULT_PACKET_SIZE));
+ p.reset(new Packet());
+ ASSERT_TRUE(p->add(e));
}
}
+ p->close();
ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size())));
}
}
uint32_t
-countFiles(const vespalib::string &dir)
+Test::countFiles(const vespalib::string &dir)
{
uint32_t res = 0;
FastOS_DirectoryScan dirScan(dir.c_str());
@@ -367,8 +408,10 @@ countFiles(const vespalib::string &dir)
return res;
}
+
void
-checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries)
+Test::checkFilledDomainTest(const TransLogClient::Session::UP &s1,
+ size_t numEntries)
{
SerialNum b(0), e(0);
size_t c(0);
@@ -378,8 +421,8 @@ checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries)
EXPECT_EQUAL(c, numEntries);
}
-bool
-visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name)
+
+bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name)
{
bool retval(true);
@@ -443,31 +486,10 @@ getMaxSessionRunTime(TransLogServer &tls, const vespalib::string &domain)
return tls.getDomainStats()[domain].maxSessionRunTime.count();
}
-void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains)
+bool Test::testVisitOverGeneratedDomain()
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test13", 18377, ".", fileHeaderContext,
- DomainConfig().setPartSizeLimit(0x1000000).setEncoding(encoding), 4);
- TransLogClient tls("tcp/localhost:18377");
-
- createDomainTest(tls, name, preExistingDomains);
- TransLogClient::Session::UP s1 = openDomainTest(tls, name);
- fillDomainTest(s1.get(), name);
-}
-
-void verifyDomain(const vespalib::string & name) {
- DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test13", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
- TransLogClient tls("tcp/localhost:18377");
- TransLogClient::Session::UP s1 = openDomainTest(tls, name);
- visitDomainTest(tls, s1.get(), name);
-}
-
-}
-
-TEST("testVisitOverGeneratedDomain") {
- DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
+ TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000);
TransLogClient tls("tcp/localhost:18377");
vespalib::string name("test1");
@@ -479,85 +501,42 @@ TEST("testVisitOverGeneratedDomain") {
double maxSessionRunTime = getMaxSessionRunTime(tlss, "test1");
LOG(info, "testVisitOverGeneratedDomain(): maxSessionRunTime=%f", maxSessionRunTime);
EXPECT_GREATER(maxSessionRunTime, 0);
+ return true;
}
-TEST("testVisitOverPreExistingDomain") {
- // Depends on Test::testVisitOverGeneratedDomain()
+void Test::createAndFillDomain(const vespalib::string & name, DomainPart::Crc crcMethod, size_t preExistingDomains)
+{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
+ TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000, 4, crcMethod);
TransLogClient tls("tcp/localhost:18377");
- vespalib::string name("test1");
+ createDomainTest(tls, name, preExistingDomains);
TransLogClient::Session::UP s1 = openDomainTest(tls, name);
- visitDomainTest(tls, s1.get(), name);
+ fillDomainTest(s1.get(), name);
}
-TEST("partialUpdateTest") {
+void Test::verifyDomain(const vespalib::string & name)
+{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
+ TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000);
TransLogClient tls("tcp/localhost:18377");
-
- TransLogClient::Session::UP s1 = openDomainTest(tls, "test1");
- TransLogClient::Session & session = *s1;
-
- TestIdentifiable du;
-
- nbostream os;
- os << du;
-
- vespalib::ConstBufferRef bb(os.data(), os.size());
- LOG(info, "DU : %s", myhex(bb.c_str(), bb.size()).c_str());
- Packet::Entry e(7, du.getClass().id(), bb);
- Packet pa(DEFAULT_PACKET_SIZE);
- pa.add(e);
- ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().data(), pa.getHandle().size())));
-
- CallBackUpdate ca;
- TransLogClient::Visitor::UP visitor = tls.createVisitor("test1", ca);
- ASSERT_TRUE(visitor.get());
- ASSERT_TRUE( visitor->visit(5, 7) );
- for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca._eof );
- ASSERT_TRUE( ca.map().size() == 1);
- ASSERT_TRUE( ca.hasSerial(7) );
-
- CallBackUpdate ca1;
- TransLogClient::Visitor::UP visitor1 = tls.createVisitor("test1", ca1);
- ASSERT_TRUE(visitor1.get());
- ASSERT_TRUE( visitor1->visit(4, 5) );
- for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca1._eof );
- ASSERT_TRUE( ca1.map().size() == 0);
-
- CallBackUpdate ca2;
- TransLogClient::Visitor::UP visitor2 = tls.createVisitor("test1", ca2);
- ASSERT_TRUE(visitor2.get());
- ASSERT_TRUE( visitor2->visit(5, 6) );
- for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca2._eof );
- ASSERT_TRUE( ca2.map().size() == 0);
-
- CallBackUpdate ca3;
- TransLogClient::Visitor::UP visitor3 = tls.createVisitor("test1", ca3);
- ASSERT_TRUE(visitor3.get());
- ASSERT_TRUE( visitor3->visit(5, 1000) );
- for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca3._eof );
- ASSERT_TRUE( ca3.map().size() == 1);
- ASSERT_TRUE( ca3.hasSerial(7) );
+ TransLogClient::Session::UP s1 = openDomainTest(tls, name);
+ visitDomainTest(tls, s1.get(), name);
}
-TEST("testCrcVersions") {
- createAndFillDomain("ccitt_crc32", Encoding(Encoding::Crc::ccitt_crc32, Encoding::Compression::none), 0);
- createAndFillDomain("xxh64", Encoding(Encoding::Crc::xxh64, Encoding::Compression::none), 1);
+void Test::testCrcVersions()
+{
+ createAndFillDomain("ccitt_crc32", DomainPart::ccitt_crc32, 0);
+ createAndFillDomain("xxh64", DomainPart::xxh64, 1);
verifyDomain("ccitt_crc32");
verifyDomain("xxh64");
}
-TEST("testRemove") {
+bool Test::testRemove()
+{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
+ TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, 0x10000);
TransLogClient tls("tcp/localhost:18377");
vespalib::string name("test-delete");
@@ -566,6 +545,21 @@ TEST("testRemove") {
fillDomainTest(s1.get(), name);
visitDomainTest(tls, s1.get(), name);
ASSERT_TRUE(tls.remove(name));
+
+ return true;
+}
+
+bool Test::testVisitOverPreExistingDomain()
+{
+ // Depends on Test::testVisitOverGeneratedDomain()
+ DummyFileHeaderContext fileHeaderContext;
+ TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000);
+ TransLogClient tls("tcp/localhost:18377");
+
+ vespalib::string name("test1");
+ TransLogClient::Session::UP s1 = openDomainTest(tls, name);
+ visitDomainTest(tls, s1.get(), name);
+ return true;
}
namespace {
@@ -606,19 +600,18 @@ assertStatus(TransLogClient::Session &s,
}
-TEST("test sending a lot of data") {
+void Test::testMany()
+{
const unsigned int NUM_PACKETS = 1000;
const unsigned int NUM_ENTRIES = 100;
const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
- const vespalib::string MANY("many");
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000)
- .setChunkAgeLimit(100us));
+ TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x80000);
TransLogClient tls("tcp/localhost:18377");
- createDomainTest(tls, MANY, 0);
- TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
+ createDomainTest(tls, "many", 0);
+ TransLogClient::Session::UP s1 = openDomainTest(tls, "many");
fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES);
SerialNum b(0), e(0);
size_t c(0);
@@ -637,7 +630,7 @@ TEST("test sending a lot of data") {
}
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
+ TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x1000000);
TransLogClient tls("tcp/localhost:18377");
TransLogClient::Session::UP s1 = openDomainTest(tls, "many");
@@ -648,79 +641,7 @@ TEST("test sending a lot of data") {
EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
CallBackManyTest ca(2);
- TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
- ASSERT_TRUE(visitor.get());
- ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca._eof );
- EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES);
- EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES);
- }
- {
- DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
- TransLogClient tls("tcp/localhost:18377");
-
- TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
- SerialNum b(0), e(0);
- size_t c(0);
- EXPECT_TRUE(s1->status(b, e, c));
- EXPECT_EQUAL(b, 1u);
- EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
- EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
- CallBackManyTest ca(2);
- TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
- ASSERT_TRUE(visitor.get());
- ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca._eof );
- EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES);
- EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES);
- }
-}
-
-TEST("test sending a lot of data async") {
- const unsigned int NUM_PACKETS = 1000;
- const unsigned int NUM_ENTRIES = 100;
- const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
- const vespalib::string MANY("many-async");
- {
- DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000)
- .setChunkAgeLimit(10ms));
- TransLogClient tls("tcp/localhost:18377");
- createDomainTest(tls, MANY, 1);
- TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
- fillDomainTest(tlss, MANY, NUM_PACKETS, NUM_ENTRIES);
- SerialNum b(0), e(0);
- size_t c(0);
- EXPECT_TRUE(s1->status(b, e, c));
-
- EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
- EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
- CallBackManyTest ca(2);
- TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
- ASSERT_TRUE(visitor.get());
- ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca._eof );
- EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES);
- EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES);
- }
- {
- DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
- TransLogClient tls("tcp/localhost:18377");
-
- TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
- SerialNum b(0), e(0);
- size_t c(0);
- EXPECT_TRUE(s1->status(b, e, c));
- EXPECT_EQUAL(b, 1u);
- EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
- EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
- CallBackManyTest ca(2);
- TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
+ TransLogClient::Visitor::UP visitor = tls.createVisitor("many", ca);
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -730,16 +651,14 @@ TEST("test sending a lot of data async") {
}
}
-
-
-
-TEST("testErase") {
+void Test::testErase()
+{
const unsigned int NUM_PACKETS = 1000;
const unsigned int NUM_ENTRIES = 100;
const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000));
+ TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x80000);
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, "erase", 0);
@@ -748,7 +667,7 @@ TEST("testErase") {
}
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
+ TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x1000000);
TransLogClient tls("tcp/localhost:18377");
TransLogClient::Session::UP s1 = openDomainTest(tls, "erase");
@@ -829,13 +748,16 @@ TEST("testErase") {
}
}
-TEST("testSync") {
+
+void
+Test::testSync()
+{
const unsigned int NUM_PACKETS = 3;
const unsigned int NUM_ENTRIES = 4;
const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test9", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
+ TransLogServer tlss("test9", 18377, ".", fileHeaderContext, 0x1000000);
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, "sync", 0);
@@ -848,7 +770,10 @@ TEST("testSync") {
EXPECT_EQUAL(syncedTo, TOTAL_NUM_ENTRIES);
}
-TEST("test truncate on version mismatch") {
+
+void
+Test::testTruncateOnVersionMismatch()
+{
const unsigned int NUM_PACKETS = 3;
const unsigned int NUM_ENTRIES = 4;
const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
@@ -857,7 +782,7 @@ TEST("test truncate on version mismatch") {
size_t countOld(0);
DummyFileHeaderContext fileHeaderContext;
{
- TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
+ TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000);
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, "sync", 0);
@@ -878,7 +803,7 @@ TEST("test truncate on version mismatch") {
EXPECT_EQUAL(static_cast<ssize_t>(sizeof(tmp)), f.Write2(tmp, sizeof(tmp)));
EXPECT_TRUE(f.Close());
{
- TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
+ TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000);
TransLogClient tls("tcp/localhost:18377");
TransLogClient::Session::UP s1 = openDomainTest(tls, "sync");
uint64_t from(0), to(0);
@@ -890,7 +815,9 @@ TEST("test truncate on version mismatch") {
}
}
-TEST("test truncation after short read") {
+void
+Test::testTruncateOnShortRead()
+{
const unsigned int NUM_PACKETS = 17;
const unsigned int NUM_ENTRIES = 1;
const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
@@ -902,7 +829,7 @@ TEST("test truncation after short read") {
DummyFileHeaderContext fileHeaderContext;
{
- TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
+ TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000);
TransLogClient tls(tlsspec);
createDomainTest(tls, domain, 0);
@@ -918,7 +845,7 @@ TEST("test truncation after short read") {
EXPECT_EQUAL(2u, countFiles(dir));
}
{
- TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
+ TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000);
TransLogClient tls(tlsspec);
TransLogClient::Session::UP s1 = openDomainTest(tls, domain);
checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES);
@@ -934,7 +861,7 @@ TEST("test truncation after short read") {
trfile.Close();
}
{
- TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
+ TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000);
TransLogClient tls(tlsspec);
TransLogClient::Session::UP s1 = openDomainTest(tls, domain);
checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES - 1);
@@ -944,4 +871,28 @@ TEST("test truncation after short read") {
}
}
-TEST_MAIN() { TEST_RUN_ALL(); }
+
+int Test::Main()
+{
+ TEST_INIT("translogclient_test");
+
+ if (_argc > 0) {
+ DummyFileHeaderContext::setCreator(_argv[0]);
+ }
+ testVisitOverGeneratedDomain();
+ testVisitOverPreExistingDomain();
+ testMany();
+ testErase();
+ partialUpdateTest();
+
+ testRemove();
+
+ testSync();
+
+ testTruncateOnShortRead();
+ testTruncateOnVersionMismatch();
+
+ testCrcVersions();
+
+ TEST_DONE();
+}
diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp
index 925f297bf48..013ca81dcc9 100644
--- a/searchlib/src/tests/transactionlogstress/translogstress.cpp
+++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp
@@ -698,7 +698,7 @@ TransLogStress::Main()
// start transaction log server
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tls("server", 17897, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(_cfg.domainPartSize));
+ TransLogServer tls("server", 17897, ".", fileHeaderContext, _cfg.domainPartSize);
TransLogClient client(tlsSpec);
client.create(domain);
diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def
index f822fc80fc1..74efe3fe68e 100644
--- a/searchlib/src/vespa/searchlib/config/translogserver.def
+++ b/searchlib/src/vespa/searchlib/config/translogserver.def
@@ -5,7 +5,7 @@ namespace=searchlib
listenport int default=13700 restart
## Max file size (50M)
-filesizemax int default=50000000
+filesizemax int default=50000000 restart
## Server name to identify server.
servername string default="tls" restart
@@ -22,17 +22,3 @@ maxthreads int default=4 restart
##Default crc method used
crcmethod enum {ccitt_crc32, xxh64} default=xxh64
-
-## Control compression type.
-compression.type enum {NONE, NONE_MULTI, LZ4, ZSTD} default=LZ4
-
-## Control compression level
-## LZ4 has normal range 1..9 while ZSTD has range 1..19
-## 9 is a reasonable default for both
-compression.level int default=9
-
-## How large a chunk can grow in memory before beeing flushed
-chunk.sizelimit int default = 256000 # 256k
-
-## How long a chunk can reside in memory befor ebeeing flushed to disk.
-chunk.agelimit double default = 0.010 # 10 milliseconds
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index 804a558789e..5e7cfc74199 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -4,14 +4,11 @@
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/util/closuretask.h>
#include <vespa/vespalib/io/fileutil.h>
-#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/fastos/file.h>
#include <algorithm>
#include <thread>
#include <vespa/log/log.h>
-#include <vespa/vespalib/util/threadstackexecutor.h>
-
LOG_SETUP(".transactionlog.domain");
using vespalib::string;
@@ -23,43 +20,29 @@ using vespalib::Monitor;
using vespalib::MonitorGuard;
using search::common::FileHeaderContext;
using std::runtime_error;
-using std::make_shared;
+using namespace std::chrono_literals;
namespace search::transactionlog {
-VESPA_THREAD_STACK_TAG(domain_commit_executor);
-
-DomainConfig::DomainConfig()
- : _encoding(Encoding::Crc::xxh64, Encoding::Compression::none),
- _compressionLevel(9),
- _partSizeLimit(0x10000000), // 256M
- _chunkSizeLimit(0x40000), // 256k
- _chunkAgeLimit(10ms)
-{ }
-
-Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPool & threadPool,
- Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg,
- const FileHeaderContext &fileHeaderContext)
- : _config(cfg),
- _lastSerial(0),
- _threadPool(threadPool),
- _singleCommiter(std::make_unique<vespalib::ThreadStackExecutor>(1, 128*1024)),
- _commitExecutor(commitExecutor),
- _sessionExecutor(sessionExecutor),
- _sessionId(1),
- _syncMonitor(),
- _pendingSync(false),
- _name(domainName),
- _parts(),
- _lock(),
- _currentChunkMonitor(),
- _sessionLock(),
- _sessions(),
- _maxSessionRunTime(),
- _baseDir(baseDir),
- _fileHeaderContext(fileHeaderContext),
- _markedDeleted(false),
- _self(nullptr)
+Domain::Domain(const string &domainName, const string & baseDir, Executor & commitExecutor,
+ Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType,
+ const FileHeaderContext &fileHeaderContext) :
+ _defaultCrcType(defaultCrcType),
+ _commitExecutor(commitExecutor),
+ _sessionExecutor(sessionExecutor),
+ _sessionId(1),
+ _syncMonitor(),
+ _pendingSync(false),
+ _name(domainName),
+ _domainPartSize(domainPartSize),
+ _parts(),
+ _lock(),
+ _sessionLock(),
+ _sessions(),
+ _maxSessionRunTime(),
+ _baseDir(baseDir),
+ _fileHeaderContext(fileHeaderContext),
+ _markedDeleted(false)
{
int retval(0);
if ((retval = makeDirectory(_baseDir.c_str())) != 0) {
@@ -77,22 +60,13 @@ Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPo
}
_sessionExecutor.sync();
if (_parts.empty() || _parts.crbegin()->second->isClosed()) {
- _parts[lastPart] = std::make_shared<DomainPart>(_name, dir(), lastPart, _config.getEncoding(),
- _config.getCompressionlevel(), _fileHeaderContext, false);
+ _parts[lastPart] = std::make_shared<DomainPart>(_name, dir(), lastPart, _defaultCrcType, _fileHeaderContext, false);
vespalib::File::sync(dir());
}
- _lastSerial = end();
-}
-
-Domain &
-Domain::setConfig(const DomainConfig & cfg) {
- _config = cfg;
- return *this;
}
void Domain::addPart(int64_t partId, bool isLastPart) {
- auto dp = std::make_shared<DomainPart>(_name, dir(), partId, _config.getEncoding(),
- _config.getCompressionlevel(), _fileHeaderContext, isLastPart);
+ auto dp = std::make_shared<DomainPart>(_name, dir(), partId, _defaultCrcType, _fileHeaderContext, isLastPart);
if (dp->size() == 0) {
// Only last domain part is allowed to be truncated down to
// empty size.
@@ -311,21 +285,18 @@ waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync)
}
-void
-Domain::commit(const Packet & packet, Writer::DoneCallback onDone)
+void Domain::commit(const Packet & packet)
{
- (void) onDone;
DomainPart::SP dp(_parts.rbegin()->second);
vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size());
Packet::Entry entry;
entry.deserialize(is);
- if (dp->byteSize() > _config.getPartSizeLimit()) {
+ if (dp->byteSize() > _domainPartSize) {
waitPendingSync(_syncMonitor, _pendingSync);
triggerSyncNow();
waitPendingSync(_syncMonitor, _pendingSync);
dp->close();
- dp = std::make_shared<DomainPart>(_name, dir(), entry.serial(), _config.getEncoding(),
- _config.getCompressionlevel(), _fileHeaderContext, false);
+ dp = std::make_shared<DomainPart>(_name, dir(), entry.serial(), _defaultCrcType, _fileHeaderContext, false);
{
LockGuard guard(_lock);
_parts[entry.serial()] = dp;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index cc7f4ac5e4b..d6f964d5140 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -4,45 +4,20 @@
#include "domainpart.h"
#include "session.h"
#include <vespa/vespalib/util/threadexecutor.h>
-#include <vespa/vespalib/util/time.h>
-#include <vespa/fastos/thread.h>
#include <chrono>
namespace search::transactionlog {
-class DomainConfig {
-public:
- using duration = vespalib::duration;
- DomainConfig();
- DomainConfig & setEncoding(Encoding v) { _encoding = v; return *this; }
- DomainConfig & setPartSizeLimit(size_t v) { _partSizeLimit = v; return *this; }
- DomainConfig & setChunkSizeLimit(size_t v) { _chunkSizeLimit = v; return *this; }
- DomainConfig & setChunkAgeLimit(vespalib::duration v) { _chunkAgeLimit = v; return *this; }
- DomainConfig & setCompressionLevel(uint8_t v) { _compressionLevel = v; return *this; }
- Encoding getEncoding() const { return _encoding; }
- size_t getPartSizeLimit() const { return _partSizeLimit; }
- size_t getChunkSizeLimit() const { return _chunkSizeLimit; }
- duration getChunkAgeLimit() const { return _chunkAgeLimit; }
- uint8_t getCompressionlevel() const { return _compressionLevel; }
-private:
- Encoding _encoding;
- uint8_t _compressionLevel;
- size_t _partSizeLimit;
- size_t _chunkSizeLimit;
- duration _chunkAgeLimit;
-};
-
struct PartInfo {
SerialNumRange range;
size_t numEntries;
size_t byteSize;
vespalib::string file;
- PartInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, vespalib::stringref file_in)
- : range(range_in),
- numEntries(numEntries_in),
- byteSize(byteSize_in),
- file(file_in)
- {}
+ PartInfo(SerialNumRange range_in, size_t numEntries_in,
+ size_t byteSize_in,
+ vespalib::stringref file_in)
+ : range(range_in), numEntries(numEntries_in), byteSize(byteSize_in),
+ file(file_in) {}
};
struct DomainInfo {
@@ -65,17 +40,17 @@ class Domain
public:
using SP = std::shared_ptr<Domain>;
using Executor = vespalib::SyncableThreadExecutor;
- Domain(const vespalib::string &name, const vespalib::string &baseDir, FastOS_ThreadPool & threadPool,
- Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg,
+ Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & commitExecutor,
+ Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType,
const common::FileHeaderContext &fileHeaderContext);
- ~Domain();
+ virtual ~Domain();
DomainInfo getDomainInfo() const;
const vespalib::string & name() const { return _name; }
bool erase(SerialNum to);
- void commit(const Packet & packet, Writer::DoneCallback onDone);
+ void commit(const Packet & packet);
int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr<Session::Destination> dest);
SerialNum begin() const;
@@ -102,7 +77,7 @@ public:
return _sessionExecutor.execute(std::move(task));
}
uint64_t size() const;
- Domain & setConfig(const DomainConfig & cfg);
+
private:
SerialNum begin(const vespalib::LockGuard & guard) const;
SerialNum end(const vespalib::LockGuard & guard) const;
@@ -120,26 +95,22 @@ private:
using DomainPartList = std::map<int64_t, DomainPart::SP>;
using DurationSeconds = std::chrono::duration<double>;
- DomainConfig _config;
- SerialNum _lastSerial;
- FastOS_ThreadPool & _threadPool;
- std::unique_ptr<Executor> _singleCommiter;
- Executor & _commitExecutor;
- Executor & _sessionExecutor;
- std::atomic<int> _sessionId;
- vespalib::Monitor _syncMonitor;
- bool _pendingSync;
- vespalib::string _name;
- DomainPartList _parts;
- vespalib::Lock _lock;
- vespalib::Monitor _currentChunkMonitor;
- vespalib::Lock _sessionLock;
- SessionList _sessions;
- DurationSeconds _maxSessionRunTime;
- vespalib::string _baseDir;
+ DomainPart::Crc _defaultCrcType;
+ Executor & _commitExecutor;
+ Executor & _sessionExecutor;
+ std::atomic<int> _sessionId;
+ vespalib::Monitor _syncMonitor;
+ bool _pendingSync;
+ vespalib::string _name;
+ uint64_t _domainPartSize;
+ DomainPartList _parts;
+ vespalib::Lock _lock;
+ vespalib::Lock _sessionLock;
+ SessionList _sessions;
+ DurationSeconds _maxSessionRunTime;
+ vespalib::string _baseDir;
const common::FileHeaderContext &_fileHeaderContext;
- bool _markedDeleted;
- FastOS_ThreadInterface * _self;
+ bool _markedDeleted;
};
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
index a6f140fce24..91599f8218a 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
@@ -176,20 +176,6 @@ handleReadError(const char *text,
return retval;
}
-int32_t
-calcCrc(Encoding::Crc version, const void * buf, size_t sz)
-{
- if (version == Encoding::Crc::xxh64) {
- return static_cast<int32_t>(XXH64(buf, sz, 0ll));
- } else if (version == Encoding::Crc::ccitt_crc32) {
- vespalib::crc_32_type calculator;
- calculator.process_bytes(buf, sz);
- return calculator.checksum();
- } else {
- LOG_ABORT("should not be reached");
- }
-}
-
}
int64_t
@@ -279,23 +265,22 @@ DomainPart::buildPacketMapping(bool allowTruncate)
return currPos;
}
-DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding,
- uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate)
- : _encoding(encoding),
- _compressionLevel(compressionLevel),
- _lock(),
- _fileLock(),
- _range(s),
- _sz(0),
- _byteSize(0),
- _packets(),
- _fileName(make_string("%s/%s-%016" PRIu64, baseDir.c_str(), name.c_str(), s)),
- _transLog(std::make_unique<FastOS_File>(_fileName.c_str())),
- _skipList(),
- _headerLen(0),
- _writeLock(),
- _writtenSerial(0),
- _syncedSerial(0)
+DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Crc defaultCrc,
+ const FileHeaderContext &fileHeaderContext, bool allowTruncate) :
+ _defaultCrc(defaultCrc),
+ _lock(),
+ _fileLock(),
+ _range(s),
+ _sz(0),
+ _byteSize(0),
+ _packets(),
+ _fileName(make_string("%s/%s-%016" PRIu64, baseDir.c_str(), name.c_str(), s)),
+ _transLog(std::make_unique<FastOS_File>(_fileName.c_str())),
+ _skipList(),
+ _headerLen(0),
+ _writeLock(),
+ _writtenSerial(0),
+ _syncedSerial(0)
{
if (_transLog->OpenReadOnly()) {
int64_t currPos = buildPacketMapping(allowTruncate);
@@ -597,12 +582,12 @@ DomainPart::write(FastOS_FileInterface &file, const Packet::Entry &entry)
int32_t crc(0);
uint32_t len(entry.serializedSize() + sizeof(crc));
nbostream os;
- os << static_cast<uint8_t>(_encoding.getRaw());
+ os << static_cast<uint8_t>(_defaultCrc);
os << len;
size_t start(os.size());
entry.serialize(os);
size_t end(os.size());
- crc = calcCrc(_encoding.getCrc(), os.data() + start, end - start);
+ crc = calcCrc(_defaultCrc, os.data() + start, end - start);
os << crc;
size_t osSize = os.size();
assert(osSize == len + sizeof(len) + sizeof(uint8_t));
@@ -630,10 +615,10 @@ DomainPart::read(FastOS_FileInterface &file,
uint32_t len(0);
his >> version >> len;
if ((retval = (rlen == sizeof(tmp)))) {
- if ( ! (retval = (version == Encoding::Crc::ccitt_crc32) || version == Encoding::Crc::xxh64)) {
+ if ( ! (retval = (version == ccitt_crc32) || version == xxh64)) {
string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2',"
- " got %d from '%s' at position %" PRId64,
- version, file.GetFileName(), lastKnownGoodPos));
+ " got %d from '%s' at position %" PRId64,
+ version, file.GetFileName(), lastKnownGoodPos));
if ((version == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) {
LOG(warning, "%s", msg.c_str());
return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate);
@@ -653,7 +638,7 @@ DomainPart::read(FastOS_FileInterface &file,
entry.deserialize(is);
int32_t crc(0);
is >> crc;
- int32_t crcVerify(calcCrc(Encoding(version).getCrc(), buf.get(), len - sizeof(crc)));
+ int32_t crcVerify(calcCrc(static_cast<Crc>(version), buf.get(), len - sizeof(crc)));
if (crc != crcVerify) {
throw runtime_error(make_string("Got bad crc for packet from '%s' (len pos=%" PRId64 ", len=%d) : crcVerify = %d, expected %d",
file.GetFileName(), file.GetPosition() - len - sizeof(len),
@@ -670,4 +655,17 @@ DomainPart::read(FastOS_FileInterface &file,
return retval;
}
+int32_t DomainPart::calcCrc(Crc version, const void * buf, size_t sz)
+{
+ if (version == xxh64) {
+ return static_cast<int32_t>(XXH64(buf, sz, 0ll));
+ } else if (version == ccitt_crc32) {
+ vespalib::crc_32_type calculator;
+ calculator.process_bytes(buf, sz);
+ return calculator.checksum();
+ } else {
+ LOG_ABORT("should not be reached");
+ }
+}
+
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
index f3a53c1e9a9..59d0df6df94 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
@@ -2,7 +2,6 @@
#pragma once
#include "common.h"
-#include "ichunk.h"
#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/util/memory.h>
#include <map>
@@ -20,9 +19,13 @@ private:
DomainPart& operator=(const DomainPart &);
public:
+ enum Crc {
+ ccitt_crc32=1,
+ xxh64=2
+ };
typedef std::shared_ptr<DomainPart> SP;
- DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Encoding defaultEncoding,
- uint8_t compressionLevel, const common::FileHeaderContext &FileHeaderContext, bool allowTruncate);
+ DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Crc defaultCrc,
+ const common::FileHeaderContext &FileHeaderContext, bool allowTruncate);
~DomainPart();
@@ -52,6 +55,7 @@ private:
static bool read(FastOS_FileInterface &file, Packet::Entry &entry, vespalib::alloc::Alloc &buf, bool allowTruncate);
void write(FastOS_FileInterface &file, const Packet::Entry &entry);
+ static int32_t calcCrc(Crc crc, const void * buf, size_t len);
void writeHeader(const common::FileHeaderContext &fileHeaderContext);
class SkipInfo
@@ -73,22 +77,21 @@ private:
};
typedef std::vector<SkipInfo> SkipList;
typedef std::map<SerialNum, Packet> PacketList;
- const Encoding _encoding;
- const uint8_t _compressionLevel;
- vespalib::Lock _lock;
- vespalib::Lock _fileLock;
- SerialNumRange _range;
- size_t _sz;
+ const Crc _defaultCrc;
+ vespalib::Lock _lock;
+ vespalib::Lock _fileLock;
+ SerialNumRange _range;
+ size_t _sz;
std::atomic<uint64_t> _byteSize;
- PacketList _packets;
- vespalib::string _fileName;
+ PacketList _packets;
+ vespalib::string _fileName;
std::unique_ptr<FastOS_FileInterface> _transLog;
- SkipList _skipList;
- uint32_t _headerLen;
- vespalib::Lock _writeLock;
+ SkipList _skipList;
+ uint32_t _headerLen;
+ vespalib::Lock _writeLock;
// Protected by _writeLock
- SerialNum _writtenSerial;
- SerialNum _syncedSerial;
+ SerialNum _writtenSerial;
+ SerialNum _syncedSerial;
};
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
index 5e44815cb1b..4aa1b0a5a90 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
@@ -22,7 +22,7 @@ public:
lz4 = 2,
zstd = 3
};
- explicit Encoding(uint8_t raw) : _raw(raw) { }
+ Encoding(uint8_t raw) : _raw(raw) {}
Encoding(Crc crc, Compression compression);
Crc getCrc() const { return Crc(_raw & 0xf); }
Compression getCompression() const { return Compression((_raw >> 4) & 0xf); }
diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.h b/searchlib/src/vespa/searchlib/transactionlog/session.h
index 9b8d23371e8..bf35d83c000 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/session.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/session.h
@@ -25,7 +25,7 @@ private:
public:
class Destination {
public:
- virtual ~Destination() = default;
+ virtual ~Destination() {}
virtual bool send(int32_t id, const vespalib::string & domain, const Packet & packet) = 0;
virtual bool sendDone(int32_t id, const vespalib::string & domain) = 0;
virtual bool connected() const = 0;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index b98453a7648..caef792704a 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -1,6 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "translogserver.h"
-#include <vespa/searchlib/common/gatecallback.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/util/time.h>
@@ -79,25 +78,25 @@ SyncHandler::PerformTask()
TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
const FileHeaderContext &fileHeaderContext)
- : TransLogServer(name, listenPort, baseDir, fileHeaderContext,
- DomainConfig().setEncoding(Encoding(Encoding::xxh64, Encoding::Compression::none))
- .setPartSizeLimit(0x10000000).setChunkSizeLimit(0x40000).setChunkAgeLimit( 100us))
+ : TransLogServer(name, listenPort, baseDir, fileHeaderContext, 0x10000000)
{}
TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
- const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg)
- : TransLogServer(name, listenPort, baseDir, fileHeaderContext, cfg, 4)
+ const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize)
+ : TransLogServer(name, listenPort, baseDir, fileHeaderContext, domainPartSize, 4, DomainPart::Crc::xxh64)
{}
TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
- const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads)
+ const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize,
+ size_t maxThreads, DomainPart::Crc defaultCrcType)
: FRT_Invokable(),
_name(name),
_baseDir(baseDir),
- _domainConfig(cfg),
+ _domainPartSize(domainPartSize),
+ _defaultCrcType(defaultCrcType),
_commitExecutor(maxThreads, 128*1024),
_sessionExecutor(maxThreads, 128*1024),
- _threadPool(std::make_unique<FastOS_ThreadPool>(1024*120)),
+ _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)),
_transport(std::make_unique<FNET_Transport>()),
_supervisor(std::make_unique<FRT_Supervisor>(_transport.get())),
_domains(),
@@ -113,8 +112,8 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con
domainDir >> domainName;
if ( ! domainName.empty()) {
try {
- auto domain = make_shared<Domain>(domainName, dir(), *_threadPool, _commitExecutor,
- _sessionExecutor, cfg, _fileHeaderContext);
+ auto domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor,
+ _domainPartSize, _defaultCrcType,_fileHeaderContext);
_domains[domain->name()] = domain;
} catch (const std::exception & e) {
LOG(warning, "Failed creating %s domain on startup. Exception = %s", domainName.c_str(), e.what());
@@ -203,21 +202,10 @@ TransLogServer::run()
req->Return();
}
}
- } while (running() && !(hasPacket && (req == nullptr)));
+ } while (running() && !(hasPacket && (req == NULL)));
LOG(info, "TLS Stopped");
}
-
-TransLogServer &
-TransLogServer::setDomainConfig(const DomainConfig & cfg) {
- Guard domainGuard(_lock);
- _domainConfig = cfg;
- for(auto &domain: _domains) {
- domain.second->setConfig(cfg);
- }
- return *this;
-}
-
DomainStats
TransLogServer::getDomainStats() const
{
@@ -446,8 +434,8 @@ TransLogServer::createDomain(FRT_RPCRequest *req)
Domain::SP domain(findDomain(domainName));
if ( !domain ) {
try {
- domain = std::make_shared<Domain>(domainName, dir(), *_threadPool, _commitExecutor,
- _sessionExecutor, _domainConfig, _fileHeaderContext);
+ domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor,
+ _domainPartSize, _defaultCrcType, _fileHeaderContext);
Guard domainGuard(_lock);
_domains[domain->name()] = domain;
writeDomainDir(domainGuard, dir(), domainList(), _domains);
@@ -556,9 +544,10 @@ TransLogServer::domainStatus(FRT_RPCRequest *req)
void
TransLogServer::commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done)
{
+ (void) done;
Domain::SP domain(findDomain(domainName));
if (domain) {
- domain->commit(packet, std::move(done));
+ domain->commit(packet);
} else {
throw IllegalArgumentException("Could not find domain " + domainName);
}
@@ -575,9 +564,7 @@ TransLogServer::domainCommit(FRT_RPCRequest *req)
if (domain) {
Packet packet(params[1]._data._buf, params[1]._data._len);
try {
- vespalib::Gate gate;
- domain->commit(packet, make_shared<GateCallback>(gate));
- gate.await();
+ domain->commit(packet);
ret.AddInt32(0);
ret.AddString("ok");
} catch (const std::exception & e) {
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
index 3f945977386..0d65f36e07d 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
@@ -26,15 +26,16 @@ public:
typedef std::shared_ptr<TransLogServer> SP;
TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
- const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads);
+ const common::FileHeaderContext &fileHeaderContext,
+ uint64_t domainPartSize, size_t maxThreads, DomainPart::Crc defaultCrc);
TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
- const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg);
+ const common::FileHeaderContext &fileHeaderContext, uint64_t domainPartSize);
TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
const common::FileHeaderContext &fileHeaderContext);
~TransLogServer() override;
DomainStats getDomainStats() const;
+
void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) override;
- TransLogServer & setDomainConfig(const DomainConfig & cfg);
class Session
{
@@ -81,7 +82,8 @@ private:
vespalib::string _name;
vespalib::string _baseDir;
- DomainConfig _domainConfig;
+ const uint64_t _domainPartSize;
+ const DomainPart::Crc _defaultCrcType;
vespalib::ThreadStackExecutor _commitExecutor;
vespalib::ThreadStackExecutor _sessionExecutor;
std::unique_ptr<FastOS_ThreadPool> _threadPool;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
index d83623661ff..b8d21fb7465 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
@@ -2,7 +2,6 @@
#include "translogserverapp.h"
#include <vespa/config/subscription/configuri.h>
-#include <vespa/vespalib/util/time.h>
#include <vespa/log/log.h>
LOG_SETUP(".translogserverapp");
@@ -25,59 +24,16 @@ TransLogServerApp::TransLogServerApp(const config::ConfigUri & tlsConfigUri,
namespace {
-Encoding::Crc
+DomainPart::Crc
getCrc(searchlib::TranslogserverConfig::Crcmethod crcType)
{
switch (crcType) {
case searchlib::TranslogserverConfig::Crcmethod::ccitt_crc32:
- return Encoding::Crc::ccitt_crc32;
+ return DomainPart::ccitt_crc32;
case searchlib::TranslogserverConfig::Crcmethod::xxh64:
- return Encoding::Crc::xxh64;
+ return DomainPart::xxh64;
}
- assert(false);
-}
-
-Encoding::Compression
-getCompression(searchlib::TranslogserverConfig::Compression::Type type)
-{
- switch (type) {
- case searchlib::TranslogserverConfig::Compression::Type::NONE:
- return Encoding::Compression::none;
- case searchlib::TranslogserverConfig::Compression::Type::NONE_MULTI:
- return Encoding::Compression::none_multi;
- case searchlib::TranslogserverConfig::Compression::Type::LZ4:
- return Encoding::Compression::lz4;
- case searchlib::TranslogserverConfig::Compression::Type::ZSTD:
- return Encoding::Compression::zstd;
- }
- assert(false);
-}
-
-Encoding
-getEncoding(const searchlib::TranslogserverConfig & cfg)
-{
- return Encoding(getCrc(cfg.crcmethod), getCompression(cfg.compression.type));
-}
-
-DomainConfig
-getDomainConfig(const searchlib::TranslogserverConfig & cfg) {
- DomainConfig dcfg;
- dcfg.setEncoding(getEncoding(cfg))
- .setCompressionLevel(cfg.compression.level)
- .setPartSizeLimit(cfg.filesizemax)
- .setChunkSizeLimit(cfg.chunk.sizelimit)
- .setChunkAgeLimit(vespalib::from_s(cfg.chunk.agelimit));
- return dcfg;
-}
-
-void
-logReconfig(const searchlib::TranslogserverConfig & cfg, const DomainConfig & dcfg) {
- LOG(config, "configure Transaction Log Server %s at port %d\n"
- "DomainConfig {encoding={%d, %d}, compression_level=%d, part_limit=%ld, chunk_limit=%ld age=%1.4f}",
- cfg.servername.c_str(), cfg.listenport,
- dcfg.getEncoding().getCrc(), dcfg.getEncoding().getCompression(), dcfg.getCompressionlevel(),
- dcfg.getPartSizeLimit(), dcfg.getChunkSizeLimit(), vespalib::to_s(dcfg.getChunkAgeLimit())
- );
+ LOG_ABORT("should not be reached");
}
}
@@ -85,12 +41,11 @@ logReconfig(const searchlib::TranslogserverConfig & cfg, const DomainConfig & dc
void
TransLogServerApp::start()
{
+ std::shared_ptr<searchlib::TranslogserverConfig> c = _tlsConfig.get();
+ auto tls = std::make_shared<TransLogServer>(c->servername, c->listenport, c->basedir, _fileHeaderContext,
+ c->filesizemax, c->maxthreads, getCrc(c->crcmethod));
std::lock_guard<std::mutex> guard(_lock);
- auto c = _tlsConfig.get();
- DomainConfig domainConfig = getDomainConfig(*c);
- logReconfig(*c, domainConfig);
- _tls = std::make_shared<TransLogServer>(c->servername, c->listenport, c->basedir, _fileHeaderContext,
- domainConfig, c->maxthreads);
+ _tls = std::move(tls);
}
TransLogServerApp::~TransLogServerApp()
@@ -101,15 +56,9 @@ TransLogServerApp::~TransLogServerApp()
void
TransLogServerApp::configure(std::unique_ptr<searchlib::TranslogserverConfig> cfg)
{
-
- std::lock_guard<std::mutex> guard(_lock);
- DomainConfig dcfg = getDomainConfig(*cfg);
- logReconfig(*cfg, dcfg);
+ LOG(config, "configure Transaction Log Server %s at port %d", cfg->servername.c_str(), cfg->listenport);
_tlsConfig.set(cfg.release());
_tlsConfig.latch();
- if (_tls) {
- _tls->setDomainConfig(dcfg);
- }
}
TransLogServer::SP