aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-01-14 17:52:55 +0100
committerGitHub <noreply@github.com>2018-01-14 17:52:55 +0100
commit63966afece7d250b4ba4c03ad1f43ee7457b1dec (patch)
treefbdc3cdf92e3ed3bb2c0b657586d9885010323a7
parentf909e2e5f34357af78e28dd4d948134a0fee50aa (diff)
Revert "Revert "Revert "Balder/group multiple commits rebased 1"""
-rw-r--r--searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp6
-rw-r--r--searchcore/src/tests/proton/server/feedstates_test.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp3
-rw-r--r--searchlib/src/tests/transactionlog/translogclient_test.cpp489
-rw-r--r--searchlib/src/tests/transactionlogstress/translogstress.cpp9
-rw-r--r--searchlib/src/vespa/searchlib/config/translogserver.def16
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/chunks.cpp128
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/chunks.h38
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/common.cpp69
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/common.h8
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp200
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h98
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp342
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.h42
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp108
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/ichunk.h57
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/session.cpp2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp121
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.h10
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp61
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h1
-rw-r--r--vespalib/src/vespa/vespalib/objects/nbostream.h3
23 files changed, 629 insertions, 1186 deletions
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
index e56271b3d5a..b8ffc41d3cd 100644
--- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
@@ -9,6 +9,7 @@
#include <vespa/searchcore/proton/feedoperation/putoperation.h>
#include <vespa/searchcore/proton/feedoperation/removeoperation.h>
#include <vespa/searchcore/proton/feedoperation/updateoperation.h>
+#include <vespa/searchcore/proton/feedoperation/wipehistoryoperation.h>
#include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h>
#include <vespa/searchcore/proton/server/configstore.h>
#include <vespa/searchcore/proton/server/ddbstate.h>
@@ -17,8 +18,10 @@
#include <vespa/searchcore/proton/server/i_feed_handler_owner.h>
#include <vespa/searchcore/proton/server/ireplayconfig.h>
#include <vespa/searchcore/proton/test/dummy_feed_view.h>
+#include <vespa/searchlib/common/idestructorcallback.h>
#include <vespa/searchlib/index/docbuilder.h>
#include <vespa/searchlib/index/dummyfileheadercontext.h>
+#include <vespa/searchlib/transactionlog/translogclient.h>
#include <vespa/searchlib/transactionlog/translogserver.h>
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/util/closuretask.h>
@@ -39,7 +42,6 @@ using search::index::schema::CollectionType;
using search::index::schema::DataType;
using vespalib::makeLambdaTask;
using search::transactionlog::TransLogServer;
-using search::transactionlog::DomainConfig;
using storage::spi::PartitionId;
using storage::spi::RemoveResult;
using storage::spi::Result;
@@ -394,7 +396,7 @@ struct FeedHandlerFixture
FeedHandler handler;
FeedHandlerFixture()
: _fileHeaderContext(),
- tls("mytls", 9016, "mytlsdir", _fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)),
+ tls("mytls", 9016, "mytlsdir", _fileHeaderContext, 0x10000),
tlsSpec("tcp/localhost:9016"),
writeService(),
schema(),
diff --git a/searchcore/src/tests/proton/server/feedstates_test.cpp b/searchcore/src/tests/proton/server/feedstates_test.cpp
index 5e6cb3fdc14..a6f3496e1ed 100644
--- a/searchcore/src/tests/proton/server/feedstates_test.cpp
+++ b/searchcore/src/tests/proton/server/feedstates_test.cpp
@@ -105,7 +105,7 @@ RemoveOperationContext::RemoveOperationContext(search::SerialNum serial)
{
op.serialize(str);
ConstBufferRef buf(str.c_str(), str.wp());
- packet.reset(new Packet(0xf000));
+ packet.reset(new Packet());
packet->add(Packet::Entry(serial, FeedOperation::REMOVE, buf));
}
RemoveOperationContext::~RemoveOperationContext() {}
diff --git a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp
index 2aafbc8c2d8..bfc59dee35e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp
@@ -15,8 +15,9 @@ void TlcProxy::commit(search::SerialNum serialNum, search::transactionlog::Type
const vespalib::nbostream &buf, DoneCallback onDone)
{
Packet::Entry entry(serialNum, type, vespalib::ConstBufferRef(buf.c_str(), buf.size()));
- Packet packet(entry.serializedSize());
+ Packet packet;
packet.add(entry);
+ packet.close();
_tlsDirectWriter.commit(_domain, packet, std::move(onDone));
}
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp
index f88b798370c..861023b79b7 100644
--- a/searchlib/src/tests/transactionlog/translogclient_test.cpp
+++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp
@@ -5,6 +5,7 @@
#include <vespa/vespalib/objects/identifiable.h>
#include <vespa/searchlib/index/dummyfileheadercontext.h>
#include <vespa/fastos/file.h>
+#include <map>
#include <vespa/log/log.h>
LOG_SETUP("translogclient_test");
@@ -13,24 +14,9 @@ using namespace search;
using namespace transactionlog;
using namespace document;
using namespace vespalib;
-using namespace std::chrono_literals;
using search::index::DummyFileHeaderContext;
-namespace {
-
-bool createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains=0);
-TransLogClient::Session::UP openDomainTest(TransLogClient & tls, const vespalib::string & name);
-bool fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name);
-void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries);
-void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize);
-uint32_t countFiles(const vespalib::string &dir);
-void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries);
-bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name);
-void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains);
-void verifyDomain(const vespalib::string & name);
-
-vespalib::string
-myhex(const void * b, size_t sz)
+vespalib::string myhex(const void * b, size_t sz)
{
static const char * hextab="0123456789ABCDEF";
const unsigned char * c = static_cast<const unsigned char *>(b);
@@ -43,6 +29,35 @@ myhex(const void * b, size_t sz)
return s;
}
+class Test : public vespalib::TestApp
+{
+public:
+ int Main() override;
+private:
+ bool createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains=0);
+ TransLogClient::Session::UP openDomainTest(TransLogClient & tls, const vespalib::string & name);
+ bool fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name);
+ void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries);
+ void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize);
+ uint32_t countFiles(const vespalib::string &dir);
+ void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries);
+ bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name);
+ bool partialUpdateTest();
+ bool testVisitOverGeneratedDomain();
+ bool testRemove();
+ void createAndFillDomain(const vespalib::string & name, DomainPart::Crc crcMethod, size_t preExistingDomains);
+ void verifyDomain(const vespalib::string & name);
+ void testCrcVersions();
+ bool testVisitOverPreExistingDomain();
+ void testMany();
+ void testErase();
+ void testSync();
+ void testTruncateOnShortRead();
+ void testTruncateOnVersionMismatch();
+};
+
+TEST_APPHOOK(Test);
+
class CallBackTest : public TransLogClient::Visitor::Callback
{
private:
@@ -60,8 +75,7 @@ public:
bool _eof;
};
-RPC::Result
-CallBackTest::receive(const Packet & p)
+RPC::Result CallBackTest::receive(const Packet & p)
{
nbostream_longlivedbuf h(p.getHandle().c_str(), p.getHandle().size());
LOG(info,"CallBackTest::receive (%zu, %zu, %zu)(%s)", h.rp(), h.size(), h.capacity(), myhex(h.peek(), h.size()).c_str());
@@ -87,8 +101,7 @@ public:
size_t _value;
};
-RPC::Result
-CallBackManyTest::receive(const Packet & p)
+RPC::Result CallBackManyTest::receive(const Packet & p)
{
nbostream_longlivedbuf h(p.getHandle().c_str(), p.getHandle().size());
for(;h.size() > 0; _count++, _value++) {
@@ -120,8 +133,7 @@ public:
};
-RPC::Result
-CallBackUpdate::receive(const Packet & packet)
+RPC::Result CallBackUpdate::receive(const Packet & packet)
{
nbostream_longlivedbuf h(packet.getHandle().c_str(), packet.getHandle().size());
while (h.size() > 0) {
@@ -173,8 +185,7 @@ public:
SerialNum _prevSerial;
};
-RPC::Result
-CallBackStatsTest::receive(const Packet & p)
+RPC::Result CallBackStatsTest::receive(const Packet & p)
{
nbostream_longlivedbuf h(p.getHandle().c_str(), p.getHandle().size());
for(;h.size() > 0; ++_count) {
@@ -210,10 +221,67 @@ public:
IMPLEMENT_IDENTIFIABLE(TestIdentifiable, Identifiable);
-constexpr size_t DEFAULT_PACKET_SIZE = 0xf000;
+bool Test::partialUpdateTest()
+{
+ bool retval(false);
+ DummyFileHeaderContext fileHeaderContext;
+ TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000);
+ TransLogClient tls("tcp/localhost:18377");
+
+ TransLogClient::Session::UP s1 = openDomainTest(tls, "test1");
+ TransLogClient::Session & session = *s1;
+
+ TestIdentifiable du;
+
+ nbostream os;
+ os << du;
+
+ vespalib::ConstBufferRef bb(os.c_str(), os.size());
+ LOG(info, "DU : %s", myhex(bb.c_str(), bb.size()).c_str());
+ Packet::Entry e(7, du.getClass().id(), bb);
+ Packet pa;
+ pa.add(e);
+ pa.close();
+ ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().c_str(), pa.getHandle().size())));
+
+ CallBackUpdate ca;
+ TransLogClient::Visitor::UP visitor = tls.createVisitor("test1", ca);
+ ASSERT_TRUE(visitor.get());
+ ASSERT_TRUE( visitor->visit(5, 7) );
+ for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); }
+ ASSERT_TRUE( ca._eof );
+ ASSERT_TRUE( ca.map().size() == 1);
+ ASSERT_TRUE( ca.hasSerial(7) );
+
+ CallBackUpdate ca1;
+ TransLogClient::Visitor::UP visitor1 = tls.createVisitor("test1", ca1);
+ ASSERT_TRUE(visitor1.get());
+ ASSERT_TRUE( visitor1->visit(4, 5) );
+ for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); }
+ ASSERT_TRUE( ca1._eof );
+ ASSERT_TRUE( ca1.map().size() == 0);
+
+ CallBackUpdate ca2;
+ TransLogClient::Visitor::UP visitor2 = tls.createVisitor("test1", ca2);
+ ASSERT_TRUE(visitor2.get());
+ ASSERT_TRUE( visitor2->visit(5, 6) );
+ for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); }
+ ASSERT_TRUE( ca2._eof );
+ ASSERT_TRUE( ca2.map().size() == 0);
+
+ CallBackUpdate ca3;
+ TransLogClient::Visitor::UP visitor3 = tls.createVisitor("test1", ca3);
+ ASSERT_TRUE(visitor3.get());
+ ASSERT_TRUE( visitor3->visit(5, 1000) );
+ for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); }
+ ASSERT_TRUE( ca3._eof );
+ ASSERT_TRUE( ca3.map().size() == 1);
+ ASSERT_TRUE( ca3.hasSerial(7) );
-bool
-createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains)
+ return retval;
+}
+
+bool Test::createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains)
{
bool retval(true);
std::vector<vespalib::string> dir;
@@ -230,40 +298,43 @@ createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t pre
return retval;
}
-TransLogClient::Session::UP
-openDomainTest(TransLogClient & tls, const vespalib::string & name)
+TransLogClient::Session::UP Test::openDomainTest(TransLogClient & tls, const vespalib::string & name)
{
TransLogClient::Session::UP s1 = tls.open(name);
ASSERT_TRUE (s1.get() != NULL);
return s1;
}
-bool
-fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name)
+bool Test::fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name)
{
bool retval(true);
Packet::Entry e1(1, 1, vespalib::ConstBufferRef("Content in buffer A", 20));
Packet::Entry e2(2, 2, vespalib::ConstBufferRef("Content in buffer B", 20));
Packet::Entry e3(3, 1, vespalib::ConstBufferRef("Content in buffer C", 20));
- Packet a(DEFAULT_PACKET_SIZE);
- a.add(e1);
- Packet b(DEFAULT_PACKET_SIZE);
- b.add(e2);
- b.add(e3);
- EXPECT_EXCEPTION(b.add(e1), std::runtime_error, "");
+ Packet a;
+ ASSERT_TRUE (a.add(e1));
+ Packet b;
+ ASSERT_TRUE (b.add(e2));
+ ASSERT_TRUE (b.add(e3));
+ ASSERT_TRUE (!b.add(e1));
+ a.close();
+ b.close();
ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(a.getHandle().c_str(), a.getHandle().size())));
ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(b.getHandle().c_str(), b.getHandle().size())));
- EXPECT_EXCEPTION(s1->commit(vespalib::ConstBufferRef(a.getHandle().c_str(), a.getHandle().size())),
- std::runtime_error,
- "commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3).");
+ try {
+ s1->commit(vespalib::ConstBufferRef(a.getHandle().c_str(), a.getHandle().size()));
+ ASSERT_TRUE(false);
+ } catch (const std::exception & e) {
+ EXPECT_EQUAL(vespalib::string("commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3)."), e.what());
+ }
EXPECT_EQUAL(a.size(), 1u);
EXPECT_EQUAL(a.range().from(), 1u);
EXPECT_EQUAL(a.range().to(), 1u);
EXPECT_EQUAL(b.size(), 2u);
EXPECT_EQUAL(b.range().from(), 2u);
EXPECT_EQUAL(b.range().to(), 3u);
- a.merge(b);
+ EXPECT_TRUE(a.merge(b));
EXPECT_EQUAL(a.size(), 3u);
EXPECT_EQUAL(a.range().from(), 1u);
EXPECT_EQUAL(a.range().to(), 3u);
@@ -278,80 +349,52 @@ fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name)
return retval;
}
-void
-fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries)
+void Test::fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries)
{
size_t value(0);
for(size_t i=0; i < numPackets; i++) {
- std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE));
+ std::unique_ptr<Packet> p(new Packet());
for(size_t j=0; j < numEntries; j++, value++) {
Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value)));
- p->add(e);
- if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) {
+ if ( ! p->add(e) ) {
+ p->close();
ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size())));
- p.reset(new Packet(DEFAULT_PACKET_SIZE));
+ p.reset(new Packet());
+ ASSERT_TRUE(p->add(e));
}
}
+ p->close();
ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size())));
}
}
-using Counter = std::atomic<size_t>;
-
-class CountDone : public IDestructorCallback {
-public:
- CountDone(Counter & inFlight) : _inFlight(inFlight) { ++_inFlight; }
- ~CountDone() override { --_inFlight; }
-private:
- Counter & _inFlight;
-};
-
-void
-fillDomainTest(TransLogServer & s1, const vespalib::string & domain, size_t numPackets, size_t numEntries)
-{
- size_t value(0);
- Counter inFlight(0);
- for(size_t i=0; i < numPackets; i++) {
- std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE));
- for(size_t j=0; j < numEntries; j++, value++) {
- Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value)));
- p->add(e);
- if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) {
- s1.commit(domain, *p, std::make_shared<CountDone>(inFlight));
- p.reset(new Packet(DEFAULT_PACKET_SIZE));
- }
- }
- s1.commit(domain, *p, std::make_shared<CountDone>(inFlight));
- LOG(info, "Inflight %ld", inFlight.load());
- }
- while (inFlight.load() != 0) {
- std::this_thread::sleep_for(10ms);
- LOG(info, "Waiting for inflight %ld to reach zero", inFlight.load());
- }
-
-}
void
-fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize)
+Test::fillDomainTest(TransLogClient::Session * s1,
+ size_t numPackets, size_t numEntries,
+ size_t entrySize)
{
size_t value(0);
std::vector<char> entryBuffer(entrySize);
for(size_t i=0; i < numPackets; i++) {
- std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE));
+ std::unique_ptr<Packet> p(new Packet());
for(size_t j=0; j < numEntries; j++, value++) {
Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&entryBuffer[0], entryBuffer.size()));
- p->add(e);
- if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) {
+ if ( ! p->add(e) ) {
+ p->close();
ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size())));
- p.reset(new Packet(DEFAULT_PACKET_SIZE));
+ p.reset(new Packet());
+ ASSERT_TRUE(p->add(e));
}
}
+ p->close();
ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size())));
}
}
+
uint32_t
-countFiles(const vespalib::string &dir)
+Test::countFiles(const vespalib::string &dir)
{
uint32_t res = 0;
FastOS_DirectoryScan dirScan(dir.c_str());
@@ -365,8 +408,10 @@ countFiles(const vespalib::string &dir)
return res;
}
+
void
-checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries)
+Test::checkFilledDomainTest(const TransLogClient::Session::UP &s1,
+ size_t numEntries)
{
SerialNum b(0), e(0);
size_t c(0);
@@ -376,8 +421,8 @@ checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries)
EXPECT_EQUAL(c, numEntries);
}
-bool
-visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name)
+
+bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name)
{
bool retval(true);
@@ -392,7 +437,7 @@ visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespal
TransLogClient::Visitor::UP visitor = tls.createVisitor(name, ca);
ASSERT_TRUE(visitor.get());
EXPECT_TRUE( visitor->visit(0, 1) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
+ for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
EXPECT_TRUE( ca._eof );
EXPECT_TRUE( ! ca.hasSerial(0) );
EXPECT_TRUE( ca.hasSerial(1) );
@@ -402,7 +447,7 @@ visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespal
visitor = tls.createVisitor(name, ca);
ASSERT_TRUE(visitor.get());
EXPECT_TRUE( visitor->visit(1, 2) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
+ for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
EXPECT_TRUE( ca._eof );
EXPECT_TRUE( ! ca.hasSerial(0) );
EXPECT_TRUE( ! ca.hasSerial(1) );
@@ -413,7 +458,7 @@ visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespal
visitor = tls.createVisitor(name, ca);
EXPECT_TRUE(visitor.get());
EXPECT_TRUE( visitor->visit(0, 3) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
+ for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
EXPECT_TRUE( ca._eof );
EXPECT_TRUE( ! ca.hasSerial(0) );
EXPECT_TRUE( ca.hasSerial(1) );
@@ -424,7 +469,7 @@ visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespal
visitor = tls.createVisitor(name, ca);
ASSERT_TRUE(visitor.get());
EXPECT_TRUE( visitor->visit(2, 3) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
+ for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
EXPECT_TRUE( ca._eof );
EXPECT_TRUE( ! ca.hasSerial(0) );
EXPECT_TRUE( !ca.hasSerial(1) );
@@ -441,31 +486,10 @@ getMaxSessionRunTime(TransLogServer &tls, const vespalib::string &domain)
return tls.getDomainStats()[domain].maxSessionRunTime.count();
}
-void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains)
+bool Test::testVisitOverGeneratedDomain()
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test13", 18377, ".", fileHeaderContext,
- DomainConfig().setPartSizeLimit(0x1000000).setEncoding(encoding), 4);
- TransLogClient tls("tcp/localhost:18377");
-
- createDomainTest(tls, name, preExistingDomains);
- TransLogClient::Session::UP s1 = openDomainTest(tls, name);
- fillDomainTest(s1.get(), name);
-}
-
-void verifyDomain(const vespalib::string & name) {
- DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test13", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
- TransLogClient tls("tcp/localhost:18377");
- TransLogClient::Session::UP s1 = openDomainTest(tls, name);
- visitDomainTest(tls, s1.get(), name);
-}
-
-}
-
-TEST("testVisitOverGeneratedDomain") {
- DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
+ TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000);
TransLogClient tls("tcp/localhost:18377");
vespalib::string name("test1");
@@ -477,85 +501,42 @@ TEST("testVisitOverGeneratedDomain") {
double maxSessionRunTime = getMaxSessionRunTime(tlss, "test1");
LOG(info, "testVisitOverGeneratedDomain(): maxSessionRunTime=%f", maxSessionRunTime);
EXPECT_GREATER(maxSessionRunTime, 0);
+ return true;
}
-TEST("testVisitOverPreExistingDomain") {
- // Depends on Test::testVisitOverGeneratedDomain()
+void Test::createAndFillDomain(const vespalib::string & name, DomainPart::Crc crcMethod, size_t preExistingDomains)
+{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
+ TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000, 4, crcMethod);
TransLogClient tls("tcp/localhost:18377");
- vespalib::string name("test1");
+ createDomainTest(tls, name, preExistingDomains);
TransLogClient::Session::UP s1 = openDomainTest(tls, name);
- visitDomainTest(tls, s1.get(), name);
+ fillDomainTest(s1.get(), name);
}
-TEST("partialUpdateTest") {
+void Test::verifyDomain(const vespalib::string & name)
+{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
+ TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000);
TransLogClient tls("tcp/localhost:18377");
-
- TransLogClient::Session::UP s1 = openDomainTest(tls, "test1");
- TransLogClient::Session & session = *s1;
-
- TestIdentifiable du;
-
- nbostream os;
- os << du;
-
- vespalib::ConstBufferRef bb(os.c_str(), os.size());
- LOG(info, "DU : %s", myhex(bb.c_str(), bb.size()).c_str());
- Packet::Entry e(7, du.getClass().id(), bb);
- Packet pa(DEFAULT_PACKET_SIZE);
- pa.add(e);
- ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().c_str(), pa.getHandle().size())));
-
- CallBackUpdate ca;
- TransLogClient::Visitor::UP visitor = tls.createVisitor("test1", ca);
- ASSERT_TRUE(visitor.get());
- ASSERT_TRUE( visitor->visit(5, 7) );
- for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca._eof );
- ASSERT_TRUE( ca.map().size() == 1);
- ASSERT_TRUE( ca.hasSerial(7) );
-
- CallBackUpdate ca1;
- TransLogClient::Visitor::UP visitor1 = tls.createVisitor("test1", ca1);
- ASSERT_TRUE(visitor1.get());
- ASSERT_TRUE( visitor1->visit(4, 5) );
- for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca1._eof );
- ASSERT_TRUE( ca1.map().size() == 0);
-
- CallBackUpdate ca2;
- TransLogClient::Visitor::UP visitor2 = tls.createVisitor("test1", ca2);
- ASSERT_TRUE(visitor2.get());
- ASSERT_TRUE( visitor2->visit(5, 6) );
- for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca2._eof );
- ASSERT_TRUE( ca2.map().size() == 0);
-
- CallBackUpdate ca3;
- TransLogClient::Visitor::UP visitor3 = tls.createVisitor("test1", ca3);
- ASSERT_TRUE(visitor3.get());
- ASSERT_TRUE( visitor3->visit(5, 1000) );
- for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca3._eof );
- ASSERT_TRUE( ca3.map().size() == 1);
- ASSERT_TRUE( ca3.hasSerial(7) );
+ TransLogClient::Session::UP s1 = openDomainTest(tls, name);
+ visitDomainTest(tls, s1.get(), name);
}
-TEST("testCrcVersions") {
- createAndFillDomain("ccitt_crc32", Encoding::Crc::ccitt_crc32, 0);
- createAndFillDomain("xxh64", Encoding::Crc::xxh64, 1);
+void Test::testCrcVersions()
+{
+ createAndFillDomain("ccitt_crc32", DomainPart::ccitt_crc32, 0);
+ createAndFillDomain("xxh64", DomainPart::xxh64, 1);
verifyDomain("ccitt_crc32");
verifyDomain("xxh64");
}
-TEST("testRemove") {
+bool Test::testRemove()
+{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
+ TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, 0x10000);
TransLogClient tls("tcp/localhost:18377");
vespalib::string name("test-delete");
@@ -564,6 +545,21 @@ TEST("testRemove") {
fillDomainTest(s1.get(), name);
visitDomainTest(tls, s1.get(), name);
ASSERT_TRUE(tls.remove(name));
+
+ return true;
+}
+
+bool Test::testVisitOverPreExistingDomain()
+{
+ // Depends on Test::testVisitOverGeneratedDomain()
+ DummyFileHeaderContext fileHeaderContext;
+ TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000);
+ TransLogClient tls("tcp/localhost:18377");
+
+ vespalib::string name("test1");
+ TransLogClient::Session::UP s1 = openDomainTest(tls, name);
+ visitDomainTest(tls, s1.get(), name);
+ return true;
}
namespace {
@@ -579,7 +575,7 @@ assertVisitStats(TransLogClient &tls, const vespalib::string &domain,
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(visitStart, visitEnd) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) {
- std::this_thread::sleep_for(10ms);
+ FastOS_Thread::Sleep(10);
}
ASSERT_TRUE(ca._eof);
EXPECT_EQUAL(expFirstSerial, ca._firstSerial);
@@ -604,19 +600,18 @@ assertStatus(TransLogClient::Session &s,
}
-TEST("test sending a lot of data") {
+void Test::testMany()
+{
const unsigned int NUM_PACKETS = 1000;
const unsigned int NUM_ENTRIES = 100;
const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
- const vespalib::string MANY("many");
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000)
- .setChunkAgeLimit(100us));
+ TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x80000);
TransLogClient tls("tcp/localhost:18377");
- createDomainTest(tls, MANY, 0);
- TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
+ createDomainTest(tls, "many", 0);
+ TransLogClient::Session::UP s1 = openDomainTest(tls, "many");
fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES);
SerialNum b(0), e(0);
size_t c(0);
@@ -625,52 +620,20 @@ TEST("test sending a lot of data") {
EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
CallBackManyTest ca(2);
- TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
- ASSERT_TRUE(visitor.get());
- ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca._eof );
- EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES);
- EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES);
- }
- {
- DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
- TransLogClient tls("tcp/localhost:18377");
-
- TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
- SerialNum b(0), e(0);
- size_t c(0);
- EXPECT_TRUE(s1->status(b, e, c));
- EXPECT_EQUAL(b, 1u);
- EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
- EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
- CallBackManyTest ca(2);
- TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
+ TransLogClient::Visitor::UP visitor = tls.createVisitor("many", ca);
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
+ for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
ASSERT_TRUE( ca._eof );
EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES);
}
-}
-
-TEST("test sending a lot of data async") {
- const unsigned int NUM_PACKETS = 1000;
- const unsigned int NUM_ENTRIES = 100;
- const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
- const vespalib::string MANY("many-async");
-
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000)
- .setChunkAgeLimit(10ms));
+ TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x1000000);
TransLogClient tls("tcp/localhost:18377");
- createDomainTest(tls, MANY, 1);
- TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
- fillDomainTest(tlss, MANY, NUM_PACKETS, NUM_ENTRIES);
+ TransLogClient::Session::UP s1 = openDomainTest(tls, "many");
SerialNum b(0), e(0);
size_t c(0);
EXPECT_TRUE(s1->status(b, e, c));
@@ -678,44 +641,24 @@ TEST("test sending a lot of data async") {
EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
CallBackManyTest ca(2);
- TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
+ TransLogClient::Visitor::UP visitor = tls.createVisitor("many", ca);
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca._eof );
- EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES);
- EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES);
- }
- {
- DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
- TransLogClient tls("tcp/localhost:18377");
-
- TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
- SerialNum b(0), e(0);
- size_t c(0);
- EXPECT_TRUE(s1->status(b, e, c));
- EXPECT_EQUAL(b, 1u);
- EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
- EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
- CallBackManyTest ca(2);
- TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
- ASSERT_TRUE(visitor.get());
- ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
+ for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
ASSERT_TRUE( ca._eof );
EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES);
}
}
-TEST("testErase") {
+void Test::testErase()
+{
const unsigned int NUM_PACKETS = 1000;
const unsigned int NUM_ENTRIES = 100;
const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000));
+ TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x80000);
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, "erase", 0);
@@ -724,7 +667,7 @@ TEST("testErase") {
}
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
+ TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x1000000);
TransLogClient tls("tcp/localhost:18377");
TransLogClient::Session::UP s1 = openDomainTest(tls, "erase");
@@ -805,13 +748,16 @@ TEST("testErase") {
}
}
-TEST("testSync") {
+
+void
+Test::testSync()
+{
const unsigned int NUM_PACKETS = 3;
const unsigned int NUM_ENTRIES = 4;
const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test9", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
+ TransLogServer tlss("test9", 18377, ".", fileHeaderContext, 0x1000000);
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, "sync", 0);
@@ -824,7 +770,10 @@ TEST("testSync") {
EXPECT_EQUAL(syncedTo, TOTAL_NUM_ENTRIES);
}
-TEST("test truncate on version mismatch") {
+
+void
+Test::testTruncateOnVersionMismatch()
+{
const unsigned int NUM_PACKETS = 3;
const unsigned int NUM_ENTRIES = 4;
const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
@@ -833,7 +782,7 @@ TEST("test truncate on version mismatch") {
size_t countOld(0);
DummyFileHeaderContext fileHeaderContext;
{
- TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
+ TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000);
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, "sync", 0);
@@ -854,7 +803,7 @@ TEST("test truncate on version mismatch") {
EXPECT_EQUAL(static_cast<ssize_t>(sizeof(tmp)), f.Write2(tmp, sizeof(tmp)));
EXPECT_TRUE(f.Close());
{
- TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
+ TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000);
TransLogClient tls("tcp/localhost:18377");
TransLogClient::Session::UP s1 = openDomainTest(tls, "sync");
uint64_t from(0), to(0);
@@ -866,7 +815,9 @@ TEST("test truncate on version mismatch") {
}
}
-TEST("test trucation after short read") {
+void
+Test::testTruncateOnShortRead()
+{
const unsigned int NUM_PACKETS = 17;
const unsigned int NUM_ENTRIES = 1;
const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
@@ -878,7 +829,7 @@ TEST("test trucation after short read") {
DummyFileHeaderContext fileHeaderContext;
{
- TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
+ TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000);
TransLogClient tls(tlsspec);
createDomainTest(tls, domain, 0);
@@ -894,7 +845,7 @@ TEST("test trucation after short read") {
EXPECT_EQUAL(2u, countFiles(dir));
}
{
- TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
+ TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000);
TransLogClient tls(tlsspec);
TransLogClient::Session::UP s1 = openDomainTest(tls, domain);
checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES);
@@ -910,7 +861,7 @@ TEST("test trucation after short read") {
trfile.Close();
}
{
- TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
+ TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000);
TransLogClient tls(tlsspec);
TransLogClient::Session::UP s1 = openDomainTest(tls, domain);
checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES - 1);
@@ -920,4 +871,28 @@ TEST("test trucation after short read") {
}
}
-TEST_MAIN() { TEST_RUN_ALL(); }
+
+int Test::Main()
+{
+ TEST_INIT("translogclient_test");
+
+ if (_argc > 0) {
+ DummyFileHeaderContext::setCreator(_argv[0]);
+ }
+ testVisitOverGeneratedDomain();
+ testVisitOverPreExistingDomain();
+ testMany();
+ testErase();
+ partialUpdateTest();
+
+ testRemove();
+
+ testSync();
+
+ testTruncateOnShortRead();
+ testTruncateOnVersionMismatch();
+
+ testCrcVersions();
+
+ TEST_DONE();
+}
diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp
index 6f2581d3799..abba84b75b6 100644
--- a/searchlib/src/tests/transactionlogstress/translogstress.cpp
+++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp
@@ -8,6 +8,7 @@
#include <vespa/searchlib/index/dummyfileheadercontext.h>
#include <vespa/fastos/app.h>
#include <iostream>
+#include <stdexcept>
#include <sstream>
#include <vespa/log/log.h>
@@ -220,6 +221,7 @@ FeederThread::~FeederThread() {}
void
FeederThread::commitPacket()
{
+ _packet.close();
const vespalib::nbostream& stream = _packet.getHandle();
if (!_session->commit(ConstBufferRef(stream.c_str(), stream.size()))) {
throw std::runtime_error(vespalib::make_string
@@ -234,9 +236,8 @@ FeederThread::commitPacket()
bool
FeederThread::addEntry(const Packet::Entry & e)
{
- if (_packet.sizeBytes() > 0xf000) return false;
- _packet.add(e);
- return true;
+ //LOG(info, "FeederThread: add %s", EntryPrinter::toStr(e).c_str());
+ return _packet.add(e);
}
void
@@ -698,7 +699,7 @@ TransLogStress::Main()
// start transaction log server
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tls("server", 17897, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(_cfg.domainPartSize));
+ TransLogServer tls("server", 17897, ".", fileHeaderContext, _cfg.domainPartSize);
TransLogClient client(tlsSpec);
client.create(domain);
diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def
index 81ab5f6e4fa..74efe3fe68e 100644
--- a/searchlib/src/vespa/searchlib/config/translogserver.def
+++ b/searchlib/src/vespa/searchlib/config/translogserver.def
@@ -5,7 +5,7 @@ namespace=searchlib
listenport int default=13700 restart
## Max file size (50M)
-filesizemax int default=50000000
+filesizemax int default=50000000 restart
## Server name to identify server.
servername string default="tls" restart
@@ -22,17 +22,3 @@ maxthreads int default=4 restart
##Default crc method used
crcmethod enum {ccitt_crc32, xxh64} default=xxh64
-
-## Control compression type.
-compression.type enum {NONE, LZ4, ZSTD} default=LZ4
-
-## Control compression level
-## LZ4 has normal range 1..9 while ZSTD has range 1..19
-## 9 is a reasonable default for both
-compression.level int default=9
-
-## How large a chunk can grow in memory before beeing flushed
-chunk.sizelimit int default = 256000 # 256k
-
-## How long a chunk can reside in memory befor ebeeing flushed to disk.
-chunk.agelimit double default = 0.010 # 10 milliseconds
diff --git a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt
index 0755d07b403..d964c88fe29 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt
@@ -1,11 +1,9 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(searchlib_transactionlog OBJECT
SOURCES
- chunks.cpp
common.cpp
domain.cpp
domainpart.cpp
- ichunk.cpp
nosyncproxy.cpp
session.cpp
trans_log_server_explorer.cpp
diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp b/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp
deleted file mode 100644
index dea0ccf9b9a..00000000000
--- a/searchlib/src/vespa/searchlib/transactionlog/chunks.cpp
+++ /dev/null
@@ -1,128 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "chunks.h"
-#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/vespalib/util/compressor.h>
-#include <vespa/vespalib/data/databuffer.h>
-
-using std::runtime_error;
-using std::make_unique;
-using vespalib::make_string;
-using vespalib::compression::compress;
-using vespalib::compression::decompress;
-using vespalib::compression::CompressionConfig;
-using vespalib::DataBuffer;
-using vespalib::ConstBufferRef;
-using vespalib::nbostream;
-
-namespace search::transactionlog {
-
-namespace {
-void verifyCrc(nbostream & is, Encoding::Crc crcType) {
- if (is.size() < sizeof(int32_t) * 2) {
- throw runtime_error(make_string("Not even room for the crc and length. Only %zu bytes left", is.size()));
- }
- size_t start = is.rp();
- is.adjustReadPos(is.size() - sizeof(int32_t));
- int32_t crc(0);
- is >> crc;
- is.rp(start);
- int32_t crcVerify = Encoding::calcCrc(crcType, is.c_str(), is.size() - sizeof(crc));
- if (crc != crcVerify) {
- throw runtime_error(make_string("Got bad crc : crcVerify = %d, expected %d",
- static_cast<int>(crcVerify), static_cast<int>(crc)));
- }
- is.rp(start);
-}
-
-Encoding::Compression
-toCompression(CompressionConfig::Type type) {
- switch (type) {
- case CompressionConfig::ZSTD:
- return Encoding::Compression::zstd;
- case CompressionConfig::LZ4:
- return Encoding::Compression::lz4;
- case CompressionConfig::NONE:
- return Encoding::Compression::none;
- default:
- abort();
- }
-}
-
-}
-
-Encoding
-CCITTCRC32None::onEncode(nbostream &os) const {
- size_t start = os.wp();
- assert(getEntries().size() == 1);
- serializeEntries(os);
- os << int32_t(Encoding::calcCrc(Encoding::Crc::ccitt_crc32, os.c_str()+start, os.size() - start));
- return Encoding(Encoding::Crc::ccitt_crc32, Encoding::Compression::none);
-}
-
-void CCITTCRC32None::onDecode(nbostream &is) {
- verifyCrc(is, Encoding::Crc::ccitt_crc32);
- nbostream data(is.peek(), is.size() - sizeof(int32_t));
- deserializeEntries(data);
- is.adjustReadPos(is.size());
-}
-
-Encoding
-XXH64None::onEncode(nbostream &os) const {
- size_t start = os.wp();
- assert(getEntries().size() == 1);
- serializeEntries(os);
- os << int32_t(Encoding::calcCrc(Encoding::Crc::xxh64, os.c_str()+start, os.size() - start));
- return Encoding(Encoding::Crc::xxh64, Encoding::Compression::none);
-}
-
-void XXH64None::onDecode(nbostream &is) {
- verifyCrc(is, Encoding::Crc::xxh64);
- nbostream data(is.peek(), is.size() - sizeof(int32_t));
- deserializeEntries(data);
- is.adjustReadPos(is.size());
-}
-
-void
-XXH64Compressed::decompress(nbostream & is) {
- uint32_t uncompressedLen;
- is >> uncompressedLen;
- vespalib::DataBuffer uncompressed;
- ConstBufferRef compressed(is.peek(), is.size()-sizeof(uint32_t)*2);
- ::decompress(_type, uncompressedLen, compressed, uncompressed, false);
- nbostream data(uncompressed.getData(), uncompressed.getDataLen());
- deserializeEntries(data);
- is.adjustReadPos(is.size());
-}
-
-XXH64Compressed::XXH64Compressed(CompressionConfig::Type type, uint8_t level)
- : _type(type),
- _level(level)
-{ }
-
-Encoding
-XXH64Compressed::compress(nbostream & os, Encoding::Crc crc) const {
- nbostream org;
- serializeEntries(org);
- DataBuffer compressed;
- CompressionConfig cfg(_type, _level, 80);
- ConstBufferRef uncompressed(org.c_str(), org.size());
- Encoding::Compression actual = toCompression(::compress(cfg, uncompressed, compressed, false));
- size_t start = os.wp();
- os.write(compressed.getData(), compressed.getDataLen());
- os << int32_t(Encoding::calcCrc(crc, os.c_str()+start, os.size() - start));
- return Encoding(Encoding::Crc::xxh64, actual);
-}
-
-Encoding
-XXH64Compressed::onEncode(IChunk::nbostream &os) const {
- return compress(os, Encoding::Crc::xxh64);
-}
-
-void
-XXH64Compressed::onDecode(IChunk::nbostream &is) {
- verifyCrc(is, Encoding::Crc::xxh64);
- decompress(is);
-}
-
-}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/chunks.h b/searchlib/src/vespa/searchlib/transactionlog/chunks.h
deleted file mode 100644
index cf88bc0a3ed..00000000000
--- a/searchlib/src/vespa/searchlib/transactionlog/chunks.h
+++ /dev/null
@@ -1,38 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include "ichunk.h"
-#include <vespa/vespalib/util/compressionconfig.h>
-
-namespace search::transactionlog {
-
-class XXH64None : public IChunk {
-protected:
- Encoding onEncode(nbostream &os) const override;
- void onDecode(nbostream &is) override;
-public:
-};
-
-class CCITTCRC32None : public IChunk {
-protected:
- Encoding onEncode(nbostream &os) const override;
- void onDecode(nbostream &is) override;
-public:
-};
-
-class XXH64Compressed : public IChunk {
-public:
- using CompressionConfig = vespalib::compression::CompressionConfig;
- XXH64Compressed(CompressionConfig::Type, uint8_t level);
-protected:
- void decompress(nbostream & os);
- Encoding compress(nbostream & os, Encoding::Crc crc) const;
- Encoding onEncode(nbostream &os) const override;
- void onDecode(nbostream &is) override;
-private:
- CompressionConfig::Type _type;
- uint8_t _level;
-};
-
-}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.cpp b/searchlib/src/vespa/searchlib/transactionlog/common.cpp
index 9bf43c8e244..a84e27b2e53 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/common.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/common.cpp
@@ -1,29 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "common.h"
-#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/fastos/file.h>
namespace search::transactionlog {
using vespalib::nbostream;
using vespalib::nbostream_longlivedbuf;
-using vespalib::make_string;
-using std::runtime_error;
-namespace {
-
-void throwRangeError(SerialNum prev, SerialNum next) __attribute__((noinline));
-
-void throwRangeError(SerialNum prev, SerialNum next) {
- if (prev < next) return;
- throw runtime_error(make_string("The new serialnum %zu is not higher than the old one %zu", next, prev));
-}
-
-}
-
-int
-makeDirectory(const char * dir)
+int makeDirectory(const char * dir)
{
int retval(-1);
@@ -37,8 +22,7 @@ makeDirectory(const char * dir)
return retval;
}
-int64_t
-SerialNumRange::cmp(const SerialNumRange & b) const
+int64_t SerialNumRange::cmp(const SerialNumRange & b) const
{
int64_t diff(0);
if ( ! (contains(b) || b.contains(*this)) ) {
@@ -50,6 +34,7 @@ SerialNumRange::cmp(const SerialNumRange & b) const
Packet::Packet(const void * buf, size_t sz) :
_count(0),
_range(),
+ _limit(sz),
_buf(static_cast<const char *>(buf), sz)
{
nbostream_longlivedbuf os(_buf.c_str(), sz);
@@ -64,22 +49,18 @@ Packet::Packet(const void * buf, size_t sz) :
}
}
-void
-Packet::merge(const Packet & packet)
+bool Packet::merge(const Packet & packet)
{
- if (_range.to() >= packet.range().from()) {
- throwRangeError(_range.to(), packet.range().from());
- }
- if (_buf.empty()) {
- _range.from(packet.range().from());
+ bool retval(_range.to() < packet._range.from());
+ if (retval) {
+ _count += packet._count;
+ _range.to(packet._range.to());
+ _buf.write(packet.getHandle().c_str(), packet.getHandle().size());
}
- _count += packet._count;
- _range.to(packet._range.to());
- _buf.write(packet.getHandle().c_str(), packet.getHandle().size());
+ return retval;
}
-nbostream &
-Packet::Entry::deserialize(nbostream & os)
+nbostream & Packet::Entry::deserialize(nbostream & os)
{
_valid = false;
int32_t len(0);
@@ -90,8 +71,7 @@ Packet::Entry::deserialize(nbostream & os)
return os;
}
-nbostream &
-Packet::Entry::serialize(nbostream & os) const
+nbostream & Packet::Entry::serialize(nbostream & os) const
{
os << _unique << _type << static_cast<uint32_t>(_data.size());
os.write(_data.c_str(), _data.size());
@@ -103,21 +83,22 @@ Packet::Entry::Entry(SerialNum u, Type t, const vespalib::ConstBufferRef & d) :
_type(t),
_valid(true),
_data(d)
-{ }
-
-void
-Packet::add(const Packet::Entry & e)
{
- if (_range.to() >= e.serial()) {
- throwRangeError(_range.to(), e.serial());
- }
+}
+
- if (_buf.empty()) {
- _range.from(e.serial());
+bool Packet::add(const Packet::Entry & e)
+{
+ bool retval((_buf.size() < _limit) && (_range.to() < e.serial()));
+ if (retval) {
+ if (_buf.empty()) {
+ _range.from(e.serial());
+ }
+ e.serialize(_buf);
+ _count++;
+ _range.to(e.serial());
}
- e.serialize(_buf);
- _count++;
- _range.to(e.serial());
+ return retval;
}
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h
index 0deceb2668a..db8b9727daa 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/common.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/common.h
@@ -69,19 +69,21 @@ public:
vespalib::ConstBufferRef _data;
};
public:
- Packet(size_t reserved) : _count(0), _range(), _buf(reserved) { }
+ Packet(size_t m=0xf000) : _count(0), _range(), _limit(m), _buf(m) { }
Packet(const void * buf, size_t sz);
- void add(const Entry & data);
+ bool add(const Entry & data);
+ void close() { }
void clear() { _buf.clear(); _count = 0; _range.from(0); _range.to(0); }
const SerialNumRange & range() const { return _range; }
const vespalib::nbostream & getHandle() const { return _buf; }
size_t size() const { return _count; }
bool empty() const { return _count == 0; }
size_t sizeBytes() const { return _buf.size(); }
- void merge(const Packet & packet);
+ bool merge(const Packet & packet);
private:
size_t _count;
SerialNumRange _range;
+ size_t _limit;
vespalib::nbostream_longlivedbuf _buf;
};
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index e29adb56dd4..88c2dd9ecc3 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -20,41 +20,28 @@ using vespalib::MonitorGuard;
using search::common::FileHeaderContext;
using std::runtime_error;
using namespace std::chrono_literals;
-using namespace std::chrono;
-using std::make_shared;
namespace search::transactionlog {
-DomainConfig::DomainConfig()
- : _encoding(Encoding::Crc::xxh64, Encoding::Compression::none),
- _compressionLevel(9),
- _partSizeLimit(0x10000000), // 256M
- _chunkSizeLimit(0x40000), // 256k
- _chunkAgeLimit(10ms)
-{ }
-Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPool & threadPool,
- Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg,
- const FileHeaderContext &fileHeaderContext)
- : _config(cfg),
- _currentChunk(std::make_unique<Chunk>()),
- _lastSerial(0),
- _threadPool(threadPool),
- _commitExecutor(commitExecutor),
- _sessionExecutor(sessionExecutor),
- _sessionId(1),
- _syncMonitor(),
- _pendingSync(false),
- _name(domainName),
- _parts(),
- _lock(),
- _currentChunkMonitor(),
- _sessionLock(),
- _sessions(),
- _maxSessionRunTime(),
- _baseDir(baseDir),
- _fileHeaderContext(fileHeaderContext),
- _markedDeleted(false),
- _self(nullptr)
+Domain::Domain(const string &domainName, const string & baseDir, Executor & commitExecutor,
+ Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType,
+ const FileHeaderContext &fileHeaderContext) :
+ _defaultCrcType(defaultCrcType),
+ _commitExecutor(commitExecutor),
+ _sessionExecutor(sessionExecutor),
+ _sessionId(1),
+ _syncMonitor(),
+ _pendingSync(false),
+ _name(domainName),
+ _domainPartSize(domainPartSize),
+ _parts(),
+ _lock(),
+ _sessionLock(),
+ _sessions(),
+ _maxSessionRunTime(),
+ _baseDir(baseDir),
+ _fileHeaderContext(fileHeaderContext),
+ _markedDeleted(false)
{
int retval(0);
if ((retval = makeDirectory(_baseDir.c_str())) != 0) {
@@ -72,34 +59,12 @@ Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPo
}
_sessionExecutor.sync();
if (_parts.empty() || _parts.crbegin()->second->isClosed()) {
- _parts[lastPart] = make_shared<DomainPart>(_name, dir(), lastPart, _config.getEncoding(),
- _config.getCompressionlevel(), _fileHeaderContext, false);
+ _parts[lastPart].reset(new DomainPart(_name, dir(), lastPart, _defaultCrcType, _fileHeaderContext, false));
}
- _lastSerial = end();
- _self = _threadPool.NewThread(this);
- assert(_self);
}
-Domain &
-Domain::setConfig(const DomainConfig & cfg) {
- _config = cfg;
- return *this;
-}
-
-void
-Domain::Run(FastOS_ThreadInterface *thisThread, void *) {
-
- while (!thisThread->GetBreakFlag()) {
- vespalib::MonitorGuard guard(_currentChunkMonitor);
- guard.wait(duration_cast<milliseconds>(_config.getChunkAgeLimit()).count());
- commitIfStale(guard);
- }
-}
-
-void
-Domain::addPart(int64_t partId, bool isLastPart) {
- auto dp = make_shared<DomainPart>(_name, dir(), partId, _config.getEncoding(),
- _config.getCompressionlevel(), _fileHeaderContext, isLastPart);
+void Domain::addPart(int64_t partId, bool isLastPart) {
+ DomainPart::SP dp(new DomainPart(_name, dir(), partId, _defaultCrcType, _fileHeaderContext, isLastPart));
if (dp->size() == 0) {
// Only last domain part is allowed to be truncated down to
// empty size.
@@ -137,16 +102,7 @@ private:
bool & _pendingSync;
};
-Domain::~Domain() {
- if (_self) {
- _self->SetBreakFlag();
- {
- MonitorGuard guard(_currentChunkMonitor);
- guard.broadcast();
- }
- _self->Join();
- }
-}
+Domain::~Domain() { }
DomainInfo
Domain::getDomainInfo() const
@@ -173,7 +129,7 @@ Domain::begin(const LockGuard & guard) const
assert(guard.locks(_lock));
SerialNum s(0);
if ( ! _parts.empty() ) {
- s = _parts.cbegin()->second->range().from();
+ s = _parts.begin()->second->range().from();
}
return s;
}
@@ -191,7 +147,7 @@ Domain::end(const LockGuard & guard) const
assert(guard.locks(_lock));
SerialNum s(0);
if ( ! _parts.empty() ) {
- s = _parts.crbegin()->second->range().to();
+ s = _parts.rbegin()->second->range().to();
}
return s;
}
@@ -245,8 +201,7 @@ Domain::triggerSyncNow()
}
}
-DomainPart::SP
-Domain::findPart(SerialNum s)
+DomainPart::SP Domain::findPart(SerialNum s)
{
LockGuard guard(_lock);
DomainPartList::iterator it(_parts.upper_bound(s));
@@ -263,14 +218,12 @@ Domain::findPart(SerialNum s)
return DomainPart::SP();
}
-uint64_t
-Domain::size() const
+uint64_t Domain::size() const
{
return size(LockGuard(_lock));
}
-uint64_t
-Domain::size(const LockGuard & guard) const
+uint64_t Domain::size(const LockGuard & guard) const
{
(void) guard;
assert(guard.locks(_lock));
@@ -281,8 +234,7 @@ Domain::size(const LockGuard & guard) const
return sz;
}
-SerialNum
-Domain::findOldestActiveVisit() const
+SerialNum Domain::findOldestActiveVisit() const
{
SerialNum oldestActive(std::numeric_limits<SerialNum>::max());
LockGuard guard(_sessionLock);
@@ -295,8 +247,7 @@ Domain::findOldestActiveVisit() const
return oldestActive;
}
-void
-Domain::cleanSessions()
+void Domain::cleanSessions()
{
if ( _sessions.empty()) {
return;
@@ -316,8 +267,7 @@ Domain::cleanSessions()
namespace {
-void
-waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync)
+void waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync)
{
MonitorGuard guard(syncMonitor);
while (pendingSync) {
@@ -327,86 +277,18 @@ waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync)
}
-Domain::Chunk::Chunk()
- : _data(size_t(-1)),
- _callBacks(),
- _firstArrivalTime()
-{}
-
-Domain::Chunk::~Chunk() = default;
-
-void
-Domain::Chunk::add(const Packet &packet, Writer::DoneCallback onDone) {
- if (_callBacks.empty()) {
- _firstArrivalTime = steady_clock::now();
- }
- _data.merge(packet);
- _callBacks.emplace_back(std::move(onDone));
-}
-
-microseconds
-Domain::Chunk::age() const {
- if (_callBacks.empty()) {
- return 0ms;
- }
- return duration_cast<microseconds>(steady_clock::now() - _firstArrivalTime);
-}
-
-void
-Domain::commit(const Packet & packet, Writer::DoneCallback onDone) {
- vespalib::MonitorGuard guard(_currentChunkMonitor);
- if (! (_lastSerial < packet.range().from())) {
- throw runtime_error(make_string("Incomming serial number(%ld) must be bigger than the last one (%ld).",
- packet.range().from(), _lastSerial));
- } else {
- _lastSerial = packet.range().to();
- }
- _currentChunk->add(packet, std::move(onDone));
- commitIfFull(guard);
-}
-
-void
-Domain::commitIfFull(const vespalib::MonitorGuard &guard) {
- if (_currentChunk->sizeBytes() > _config.getChunkSizeLimit()) {
- auto completed = grabCurrentChunk(guard);
- if (completed) {
- commitChunk(std::move(completed), guard);
- }
- }
-}
-
-std::unique_ptr<Domain::Chunk>
-Domain::grabCurrentChunk(const vespalib::MonitorGuard & guard) {
- assert(guard.monitors(_currentChunkMonitor));
- auto chunk = std::move(_currentChunk);
- _currentChunk = std::make_unique<Chunk>();
- return chunk;
-}
-
-void
-Domain::commitIfStale(const vespalib::MonitorGuard & guard) {
- assert(guard.monitors(_currentChunkMonitor));
- if (_currentChunk->age() > _config.getChunkAgeLimit()) {
- commitChunk(grabCurrentChunk(guard), guard);
- }
-}
-
-void
-Domain::commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard)
+void Domain::commit(const Packet & packet)
{
- assert(chunkOrderGuard.monitors(_currentChunkMonitor));
- const Packet & packet = chunk->getPacket();
DomainPart::SP dp(_parts.rbegin()->second);
vespalib::nbostream_longlivedbuf is(packet.getHandle().c_str(), packet.getHandle().size());
Packet::Entry entry;
entry.deserialize(is);
- if (dp->byteSize() > _config.getPartSizeLimit()) {
+ if (dp->byteSize() > _domainPartSize) {
waitPendingSync(_syncMonitor, _pendingSync);
triggerSyncNow();
waitPendingSync(_syncMonitor, _pendingSync);
dp->close();
- dp = make_shared<DomainPart>(_name, dir(), entry.serial(), _config.getEncoding(),
- _config.getCompressionlevel(), _fileHeaderContext, false);
+ dp.reset(new DomainPart(_name, dir(), entry.serial(), _defaultCrcType, _fileHeaderContext, false));
{
LockGuard guard(_lock);
_parts[entry.serial()] = dp;
@@ -417,8 +299,7 @@ Domain::commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard &
cleanSessions();
}
-bool
-Domain::erase(SerialNum to)
+bool Domain::erase(SerialNum to)
{
bool retval(true);
/// Do not erase the last element
@@ -436,9 +317,8 @@ Domain::erase(SerialNum to)
return retval;
}
-int
-Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to,
- FRT_Supervisor & supervisor, FNET_Connection *conn)
+int Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to,
+ FRT_Supervisor & supervisor, FNET_Connection *conn)
{
assert(this == domain.get());
cleanSessions();
@@ -449,8 +329,7 @@ Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to,
return session->id();
}
-int
-Domain::startSession(int sessionId)
+int Domain::startSession(int sessionId)
{
int retval(-1);
LockGuard guard(_sessionLock);
@@ -466,8 +345,7 @@ Domain::startSession(int sessionId)
return retval;
}
-int
-Domain::closeSession(int sessionId)
+int Domain::closeSession(int sessionId)
{
_commitExecutor.sync();
int retval(-1);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index 2690f32471b..c1ff9157a6f 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -8,39 +8,16 @@
namespace search::transactionlog {
-class DomainConfig {
-public:
- using microseconds = std::chrono::microseconds;
- DomainConfig();
- DomainConfig & setEncoding(Encoding v) { _encoding = v; return *this; }
- DomainConfig & setPartSizeLimit(size_t v) { _partSizeLimit = v; return *this; }
- DomainConfig & setChunkSizeLimit(size_t v) { _chunkSizeLimit = v; return *this; }
- DomainConfig & setChunkAgeLimit(microseconds v) { _chunkAgeLimit = v; return *this; }
- DomainConfig & setCompressionLevel(uint8_t v) { _compressionLevel = v; return *this; }
- Encoding getEncoding() const { return _encoding; }
- size_t getPartSizeLimit() const { return _partSizeLimit; }
- size_t getChunkSizeLimit() const { return _chunkSizeLimit; }
- microseconds getChunkAgeLimit() const { return _chunkAgeLimit; }
- uint8_t getCompressionlevel() const { return _compressionLevel; }
-private:
- Encoding _encoding;
- uint8_t _compressionLevel;
- size_t _partSizeLimit;
- size_t _chunkSizeLimit;
- microseconds _chunkAgeLimit;
-};
-
struct PartInfo {
SerialNumRange range;
size_t numEntries;
size_t byteSize;
vespalib::string file;
- PartInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, vespalib::stringref file_in)
- : range(range_in),
- numEntries(numEntries_in),
- byteSize(byteSize_in),
- file(file_in)
- {}
+ PartInfo(SerialNumRange range_in, size_t numEntries_in,
+ size_t byteSize_in,
+ vespalib::stringref file_in)
+ : range(range_in), numEntries(numEntries_in), byteSize(byteSize_in),
+ file(file_in) {}
};
struct DomainInfo {
@@ -58,22 +35,22 @@ struct DomainInfo {
typedef std::map<vespalib::string, DomainInfo> DomainStats;
-class Domain final : public FastOS_Runnable
+class Domain
{
public:
using SP = std::shared_ptr<Domain>;
using Executor = vespalib::ThreadExecutor;
- Domain(const vespalib::string &name, const vespalib::string &baseDir, FastOS_ThreadPool & threadPool,
- Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg,
+ Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & commitExecutor,
+ Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType,
const common::FileHeaderContext &fileHeaderContext);
- ~Domain() override;
+ virtual ~Domain();
DomainInfo getDomainInfo() const;
const vespalib::string & name() const { return _name; }
bool erase(SerialNum to);
- void commit(const Packet & packet, Writer::DoneCallback onDone);
+ void commit(const Packet & packet);
int visit(const Domain::SP & self, SerialNum from, SerialNum to, FRT_Supervisor & supervisor, FNET_Connection *conn);
SerialNum begin() const;
@@ -100,27 +77,8 @@ public:
return _sessionExecutor.execute(std::move(task));
}
uint64_t size() const;
- Domain & setConfig(const DomainConfig & cfg);
+
private:
- void Run(FastOS_ThreadInterface *thisThread, void *arguments) override;
- void commitIfStale(const vespalib::MonitorGuard & guard);
- void commitIfFull(const vespalib::MonitorGuard & guard);
- class Chunk {
- public:
- Chunk();
- ~Chunk();
- void add(const Packet & packet, Writer::DoneCallback onDone);
- size_t sizeBytes() const { return _data.sizeBytes(); }
- const Packet & getPacket() const { return _data; }
- std::chrono::microseconds age() const;
- private:
- Packet _data;
- std::vector<Writer::DoneCallback> _callBacks;
- std::chrono::steady_clock::time_point _firstArrivalTime;
- };
-
- std::unique_ptr<Chunk> grabCurrentChunk(const vespalib::MonitorGuard & guard);
- void commitChunk(std::unique_ptr<Chunk> chunk, const vespalib::MonitorGuard & chunkOrderGuard);
SerialNum begin(const vespalib::LockGuard & guard) const;
SerialNum end(const vespalib::LockGuard & guard) const;
size_t byteSize(const vespalib::LockGuard & guard) const;
@@ -137,26 +95,22 @@ private:
using DomainPartList = std::map<int64_t, DomainPart::SP>;
using DurationSeconds = std::chrono::duration<double>;
- DomainConfig _config;
- std::unique_ptr<Chunk> _currentChunk;
- SerialNum _lastSerial;
- FastOS_ThreadPool & _threadPool;
- Executor & _commitExecutor;
- Executor & _sessionExecutor;
- std::atomic<int> _sessionId;
- vespalib::Monitor _syncMonitor;
- bool _pendingSync;
- vespalib::string _name;
- DomainPartList _parts;
- vespalib::Lock _lock;
- vespalib::Monitor _currentChunkMonitor;
- vespalib::Lock _sessionLock;
- SessionList _sessions;
- DurationSeconds _maxSessionRunTime;
- vespalib::string _baseDir;
+ DomainPart::Crc _defaultCrcType;
+ Executor & _commitExecutor;
+ Executor & _sessionExecutor;
+ std::atomic<int> _sessionId;
+ vespalib::Monitor _syncMonitor;
+ bool _pendingSync;
+ vespalib::string _name;
+ uint64_t _domainPartSize;
+ DomainPartList _parts;
+ vespalib::Lock _lock;
+ vespalib::Lock _sessionLock;
+ SessionList _sessions;
+ DurationSeconds _maxSessionRunTime;
+ vespalib::string _baseDir;
const common::FileHeaderContext &_fileHeaderContext;
- bool _markedDeleted;
- FastOS_ThreadInterface * _self;
+ bool _markedDeleted;
};
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
index f140fb93b96..35bdc71c963 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
@@ -2,6 +2,7 @@
#include "domainpart.h"
#include <vespa/vespalib/util/crc.h>
+#include <vespa/vespalib/xxhash/xxhash.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/data/fileheader.h>
#include <vespa/searchlib/common/fileheadercontext.h>
@@ -26,26 +27,37 @@ namespace search::transactionlog {
namespace {
-constexpr size_t TARGET_PACKET_SIZE = 0x3f000;
+void
+handleSync(FastOS_FileInterface &file) __attribute__ ((noinline));
string
-handleWriteError(const char *text, FastOS_FileInterface &file, int64_t lastKnownGoodPos,
- SerialNumRange range, int bufLen) __attribute__ ((noinline));
+handleWriteError(const char *text,
+ FastOS_FileInterface &file,
+ int64_t lastKnownGoodPos,
+ const Packet::Entry &entry,
+ int bufLen) __attribute__ ((noinline));
bool
-handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize_t rlen,
- int64_t lastKnownGoodPos, bool allowTruncate) __attribute__ ((noinline));
+handleReadError(const char *text,
+ FastOS_FileInterface &file,
+ ssize_t len,
+ ssize_t rlen,
+ int64_t lastKnownGoodPos,
+ bool allowTruncate) __attribute__ ((noinline));
-void handleSync(FastOS_FileInterface &file) __attribute__ ((noinline));
-void addPacket(Packet &packet, const Packet::Entry &e) __attribute__ ((noinline));
-bool tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) __attribute__ ((noinline));
+bool
+addPacket(Packet &packet,
+ const Packet::Entry &e) __attribute__ ((noinline));
-void
+bool
+tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) __attribute__ ((noinline));
+
+bool
addPacket(Packet &packet, const Packet::Entry &e)
{
LOG(spam, "Adding serial #%" PRIu64 ", of type %d and size %zd into packet of size %zu and %zu bytes",
e.serial(), e.type(), e.data().size(), packet.size(), packet.sizeBytes());
- packet.add(e);
+ return ! packet.add(e);
}
void
@@ -60,18 +72,21 @@ handleSync(FastOS_FileInterface &file)
}
string
-handleWriteError(const char *text, FastOS_FileInterface &file, int64_t lastKnownGoodPos,
- SerialNumRange range, int bufLen)
+handleWriteError(const char *text,
+ FastOS_FileInterface &file,
+ int64_t lastKnownGoodPos,
+ const Packet::Entry &entry,
+ int bufLen)
{
string last(FastOS_File::getLastErrorString());
- string e(make_string("%s. File '%s' at position %" PRId64 " for entries [%zu, %zu] of length %u. "
- "OS says '%s'. Rewind to last known good position %zu.",
- text, file.GetFileName(), file.GetPosition(), range.from(), range.to(), bufLen,
+ string e(make_string("%s. File '%s' at position %" PRId64 " for entry %" PRIu64 " of length %u. "
+ "OS says '%s'. Rewind to last known good position %" PRId64 ".",
+ text, file.GetFileName(), file.GetPosition(), entry.serial(), bufLen,
last.c_str(), lastKnownGoodPos));
LOG(error, "%s", e.c_str());
if ( ! file.SetPosition(lastKnownGoodPos) ) {
last = FastOS_File::getLastErrorString();
- throw runtime_error(make_string("Failed setting position %zu of file '%s' of size %zd : OS says '%s'",
+ throw runtime_error(make_string("Failed setting position %" PRId64 " of file '%s' of size %" PRId64 ": OS says '%s'",
lastKnownGoodPos, file.GetFileName(), file.GetSize(), last.c_str()));
}
handleSync(file);
@@ -103,8 +118,12 @@ tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos)
}
bool
-handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize_t rlen,
- int64_t lastKnownGoodPos, bool allowTruncate)
+handleReadError(const char *text,
+ FastOS_FileInterface &file,
+ ssize_t len,
+ ssize_t rlen,
+ int64_t lastKnownGoodPos,
+ bool allowTruncate)
{
bool retval(true);
if (rlen != -1) {
@@ -159,43 +178,6 @@ handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize
}
-Packet
-DomainPart::readPacket(FastOS_FileInterface & transLog, SerialNumRange wanted, size_t targetSize, bool allowTruncate) {
- Alloc buf;
- Packet packet(targetSize);
- int64_t fSize(transLog.GetSize());
- int64_t currPos(transLog.GetPosition());
- for(size_t i(0); (packet.sizeBytes() < targetSize) && (currPos < fSize) && (packet.range().to() < wanted.to()); i++) {
- IChunk::UP chunk;
- if (read(transLog, chunk, buf, allowTruncate)) {
- if (chunk) {
- try {
- for (const Packet::Entry & e : chunk->getEntries()) {
- if ((wanted.from() < e.serial()) && (e.serial() <= wanted.to())) {
- addPacket(packet, e);
- }
- }
- } catch (const std::exception & ex) {
- throw runtime_error(make_string("%s : Failed creating packet for list %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
- ex.what(), transLog.GetFileName(), fSize, currPos, transLog.GetPosition()));
- }
- } else {
- throw runtime_error(make_string("Invalid entry reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
- transLog.GetFileName(), fSize, currPos, transLog.GetPosition()));
- }
- } else {
- if (transLog.GetSize() != fSize) {
- fSize = transLog.GetSize();
- } else {
- throw runtime_error(make_string("Failed reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
- transLog.GetFileName(), fSize, currPos, transLog.GetPosition()));
- }
- }
- currPos = transLog.GetPosition();
- }
- return packet;
-}
-
int64_t
DomainPart::buildPacketMapping(bool allowTruncate)
{
@@ -226,35 +208,66 @@ DomainPart::buildPacketMapping(bool allowTruncate)
handleReadError("file header", transLog, 0, FileHeader::getMinSize(), 0, allowTruncate);
}
}
- const SerialNumRange all(0, std::numeric_limits<SerialNum>::max());
while ((currPos < fSize)) {
- const int64_t firstPos(currPos);
- Packet packet = readPacket(transLog, all, TARGET_PACKET_SIZE, allowTruncate);
- if (!packet.empty()) {
- _sz += packet.size();
- const SerialNum firstSerial = packet.range().from();
- if (currPos == _headerLen) {
- _range.from(firstSerial);
+ Packet packet;
+ SerialNum firstSerial(0);
+ SerialNum lastSerial(0);
+ int64_t firstPos(currPos);
+ bool full(false);
+ Alloc buf;
+ for(size_t i(0); !full && (currPos < fSize); i++) {
+ Packet::Entry e;
+ if (read(transLog, e, buf, allowTruncate)) {
+ if (e.valid()) {
+ if (i == 0) {
+ firstSerial = e.serial();
+ if (currPos == _headerLen) {
+ _range.from(firstSerial);
+ }
+ }
+ try {
+ full = addPacket(packet, e);
+ if ( ! full ) {
+ lastSerial = e.serial();
+ currPos = transLog.GetPosition();
+ _sz++;
+ } else {
+ transLog.SetPosition(currPos);
+ }
+ } catch (const std::exception & ex) {
+ throw runtime_error(make_string("%s : Failed creating packet for list %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
+ ex.what(), transLog.GetFileName(), fSize, currPos, transLog.GetPosition()));
+ }
+ } else {
+ throw runtime_error(make_string("Invalid entry reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
+ transLog.GetFileName(), fSize, currPos, transLog.GetPosition()));
+ }
+ } else {
+ if (transLog.GetSize() != fSize) {
+ fSize = transLog.GetSize();
+ } else {
+ throw runtime_error(make_string("Failed reading file %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
+ transLog.GetFileName(), fSize, currPos, transLog.GetPosition()));
+ }
}
- _range.to(packet.range().to());
- _packets.insert(std::make_pair(firstSerial, std::move(packet)));
+ }
+ packet.close();
+ if (!packet.empty()) {
+ _packets[firstSerial] = packet;
+ _range.to(lastSerial);
{
LockGuard guard(_lock);
_skipList.push_back(SkipInfo(firstSerial, firstPos));
}
- } else {
- fSize = transLog.GetSize();
}
- currPos = transLog.GetPosition();
}
transLog.Close();
return currPos;
}
-DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding,
- uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate) :
- _encoding(encoding),
- _compressionLevel(compressionLevel),
+DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Crc defaultCrc,
+ const FileHeaderContext &fileHeaderContext, bool allowTruncate) :
+ _defaultCrc(defaultCrc),
_lock(),
_fileLock(),
_range(s),
@@ -398,12 +411,10 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet)
//LOG(spam,
//"Pos(%d) Len(%d), Lim(%d), Remaining(%d)",
//h.getPos(), h.getLength(), h.getLimit(), h.getRemaining());
- IChunk::UP chunk = IChunk::create(_encoding, _compressionLevel);
Packet::Entry entry;
entry.deserialize(h);
if (_range.to() < entry.serial()) {
- chunk->add(entry);
- write(*_transLog, *chunk);
+ write(*_transLog, entry);
_sz++;
_range.to(entry.serial());
} else {
@@ -417,15 +428,17 @@ DomainPart::commit(SerialNum firstSerial, const Packet &packet)
if ( ! _packets.empty() ) {
Packet & lastPacket = _packets.rbegin()->second;
if (lastPacket.sizeBytes() < 0xf000) {
- lastPacket.merge(packet);
- merged = true;
+ if ( ! (merged = lastPacket.merge(packet)) ) {
+ LOG(error, "Failed merging packet [%" PRIu64 ", %" PRIu64 "] with [%" PRIu64 ", %" PRIu64 "]",
+ lastPacket.range().from(), lastPacket.range().to(),
+ packet.range().from(), packet.range().to());
+ }
}
}
if (! merged ) {
- _packets.insert(std::make_pair(firstSerial, std::move(packet)));
+ _packets[firstSerial] = packet;
_skipList.push_back(SkipInfo(firstSerial, firstPos));
}
- sync();
}
void DomainPart::sync()
@@ -493,17 +506,23 @@ DomainPart::visit(SerialNumRange &r, Packet &packet)
if (e.serial() <= r.to()) {
LOG(spam, "Adding serial #%" PRIu64 ", of type %d and size %zd into packet of size %zu and %zu bytes",
e.serial(), e.type(), e.data().size(), newPacket.size(), newPacket.sizeBytes());
- newPacket.add(e);
- r.from(e.serial());
+ if (newPacket.add(e)) {
+ r.from(e.serial());
+ } else {
+ throw runtime_error("Could not add entry to packet. Here is some mumbo jumbo. Fix.");
+ }
} else {
// Force breakout on visiting empty interval.
r.from(r.to());
}
}
}
- packet = std::move(newPacket);
+ newPacket.close();
+ packet = newPacket;
retval = next != _packets.end();
}
+ } else {
+ packet.close();
}
} else {
/// File has been closed must continue from file.
@@ -520,86 +539,131 @@ DomainPart::visit(FastOS_FileInterface &file, SerialNumRange &r, Packet &packet)
if ( ! file.IsOpened() ) {
retval = openAndFind(file, r.from() + 1);
}
- if ( ! retval) {
- return false;
- }
-
- packet = readPacket(file, r, TARGET_PACKET_SIZE, false);
- if (!packet.empty()) {
- r.from(packet.range().to());
+ if (retval) {
+ Packet newPacket;
+ Alloc buf;
+ for (bool full(false);!full && retval && (r.from() < r.to());) {
+ Packet::Entry e;
+ int64_t fPos = file.GetPosition();
+ retval = read(file, e, buf, false);
+ if (retval &&
+ e.valid() &&
+ (r.from() < e.serial()) &&
+ (e.serial() <= r.to())) {
+ try {
+ full = addPacket(newPacket, e);
+ } catch (const std::exception & ex) {
+ throw runtime_error(make_string("%s : Failed creating packet for visit %s(%" PRIu64 ") at pos(%" PRIu64 ", %" PRIu64 ")",
+ ex.what(), file.GetFileName(), file.GetSize(), fPos, file.GetPosition()));
+ }
+ if ( !full ) {
+ r.from(e.serial());
+ } else {
+ if ( ! file.SetPosition(fPos) ) {
+ throw runtime_error(make_string("Failed setting read position for file '%s' of size %" PRId64 " from %" PRId64 " to %" PRId64 ".",
+ file.GetFileName(), file.GetSize(), file.GetPosition(), fPos));
+ }
+ }
+ }
+ }
+ newPacket.close();
+ packet = newPacket;
}
- return !packet.empty();
+ return retval;
}
void
-DomainPart::write(FastOS_FileInterface &file, const IChunk & chunk)
+DomainPart::write(FastOS_FileInterface &file, const Packet::Entry &entry)
{
- nbostream os;
- size_t begin = os.wp();
- os << _encoding.getRaw();
- os << uint32_t(0);
- Encoding realEncoding = chunk.encode(os);
- size_t end = os.wp();
- os.wp(0);
- os << realEncoding.getRaw();
- os << uint32_t(end - (begin + sizeof(uint32_t) + sizeof(uint8_t)));
- os.wp(end);
int64_t lastKnownGoodPos(file.GetPosition());
+ int32_t crc(0);
+ uint32_t len(entry.serializedSize() + sizeof(crc));
+ nbostream os;
+ os << static_cast<uint8_t>(_defaultCrc);
+ os << len;
+ size_t start(os.size());
+ entry.serialize(os);
+ size_t end(os.size());
+ crc = calcCrc(_defaultCrc, os.c_str()+start, end - start);
+ os << crc;
+ size_t osSize = os.size();
+ assert(osSize == len + sizeof(len) + sizeof(uint8_t));
LockGuard guard(_writeLock);
- if ( ! file.CheckedWrite(os.c_str(), os.size()) ) {
- throw runtime_error(handleWriteError("Failed writing the entry.", file, lastKnownGoodPos, chunk.range(), os.size()));
+ if ( ! file.CheckedWrite(os.c_str(), osSize) ) {
+ throw runtime_error(handleWriteError("Failed writing the entry.", file, lastKnownGoodPos, entry, end - start));
}
- _writtenSerial = chunk.range().to();
- _byteSize.store(lastKnownGoodPos + os.size(), std::memory_order_release);
+ _writtenSerial = entry.serial();
+ _byteSize.store(lastKnownGoodPos + osSize, std::memory_order_release);
}
bool
-DomainPart::read(FastOS_FileInterface &file, IChunk::UP & chunk, Alloc & buf, bool allowTruncate)
+DomainPart::read(FastOS_FileInterface &file,
+ Packet::Entry &entry,
+ Alloc & buf,
+ bool allowTruncate)
{
+ bool retval(true);
char tmp[5];
int64_t lastKnownGoodPos(file.GetPosition());
size_t rlen = file.Read(tmp, sizeof(tmp));
nbostream his(tmp, sizeof(tmp));
- uint8_t encoding(-1);
+ uint8_t version(-1);
uint32_t len(0);
- his >> encoding >> len;
- if (rlen != sizeof(tmp)) {
- return (rlen == 0)
- ? true
- : handleReadError("packet length", file, sizeof(len), rlen, lastKnownGoodPos, allowTruncate);
- }
-
- try {
- chunk = IChunk::create(encoding);
- } catch (const std::exception & e) {
- string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2',"
- " got %d from '%s' at position %ld",
- encoding, file.GetFileName(), lastKnownGoodPos));
- if ((encoding == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) {
- LOG(warning, "%s", msg.c_str());
- return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate);
+ his >> version >> len;
+ if ((retval = (rlen == sizeof(tmp)))) {
+ if ( ! (retval = (version == ccitt_crc32) || version == xxh64)) {
+ string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2',"
+ " got %d from '%s' at position %ld",
+ version, file.GetFileName(), lastKnownGoodPos));
+ if ((version == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) {
+ LOG(warning, "%s", msg.c_str());
+ return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate);
+ } else {
+ throw runtime_error(msg);
+ }
+ }
+ if (len > buf.size()) {
+ Alloc::alloc(len).swap(buf);
+ }
+ rlen = file.Read(buf.get(), len);
+ retval = rlen == len;
+ if (!retval) {
+ retval = handleReadError("packet blob", file, len, rlen, lastKnownGoodPos, allowTruncate);
} else {
- throw runtime_error(msg);
+ nbostream_longlivedbuf is(buf.get(), len);
+ entry.deserialize(is);
+ int32_t crc(0);
+ is >> crc;
+ int32_t crcVerify(calcCrc(static_cast<Crc>(version), buf.get(), len - sizeof(crc)));
+ if (crc != crcVerify) {
+ throw runtime_error(make_string("Got bad crc for packet from '%s' (len pos=%" PRId64 ", len=%d) : crcVerify = %d, expected %d",
+ file.GetFileName(), file.GetPosition() - len - sizeof(len),
+ static_cast<int>(len), static_cast<int>(crcVerify), static_cast<int>(crc)));
+ }
+ }
+ } else {
+ if (rlen == 0) {
+ // Eof
+ } else {
+ retval = handleReadError("packet length", file, sizeof(len), rlen, lastKnownGoodPos, allowTruncate);
}
}
- if (len > buf.size()) {
- Alloc::alloc(len).swap(buf);
- }
- rlen = file.Read(buf.get(), len);
- if (rlen != len) {
- return handleReadError("packet blob", file, len, rlen, lastKnownGoodPos, allowTruncate);
- }
- try {
- nbostream_longlivedbuf is(buf.get(), len);
- chunk->decode(is);
- } catch (const std::exception & e) {
- throw runtime_error(make_string("Got exception during decoding of packet '%s' from file '%s' (len pos=%" PRId64 ", len=%d)",
- e.what(), file.GetFileName(), file.GetPosition() - len - sizeof(len), static_cast<int>(len)));
- }
+ return retval;
+}
- return true;
+int32_t DomainPart::calcCrc(Crc version, const void * buf, size_t sz)
+{
+ if (version == xxh64) {
+ return static_cast<int32_t>(XXH64(buf, sz, 0ll));
+ } else if (version == ccitt_crc32) {
+ vespalib::crc_32_type calculator;
+ calculator.process_bytes(buf, sz);
+ return calculator.checksum();
+ } else {
+ abort();
+ }
}
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
index 5256b731125..59d0df6df94 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
@@ -2,7 +2,6 @@
#pragma once
#include "common.h"
-#include "ichunk.h"
#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/util/memory.h>
#include <map>
@@ -20,9 +19,13 @@ private:
DomainPart& operator=(const DomainPart &);
public:
+ enum Crc {
+ ccitt_crc32=1,
+ xxh64=2
+ };
typedef std::shared_ptr<DomainPart> SP;
- DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Encoding defaultEncoding,
- uint8_t compressionLevel, const common::FileHeaderContext &FileHeaderContext, bool allowTruncate);
+ DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Crc defaultCrc,
+ const common::FileHeaderContext &FileHeaderContext, bool allowTruncate);
~DomainPart();
@@ -46,13 +49,13 @@ public:
}
bool isClosed() const;
private:
- using Alloc = vespalib::alloc::Alloc;
bool openAndFind(FastOS_FileInterface &file, const SerialNum &from);
int64_t buildPacketMapping(bool allowTruncate);
- static Packet readPacket(FastOS_FileInterface & file, SerialNumRange wanted, size_t targetSize, bool allowTruncate);
- static bool read(FastOS_FileInterface &file, IChunk::UP & chunk, Alloc &buf, bool allowTruncate);
- void write(FastOS_FileInterface &file, const IChunk & entry);
+ static bool read(FastOS_FileInterface &file, Packet::Entry &entry, vespalib::alloc::Alloc &buf, bool allowTruncate);
+
+ void write(FastOS_FileInterface &file, const Packet::Entry &entry);
+ static int32_t calcCrc(Crc crc, const void * buf, size_t len);
void writeHeader(const common::FileHeaderContext &fileHeaderContext);
class SkipInfo
@@ -74,22 +77,21 @@ private:
};
typedef std::vector<SkipInfo> SkipList;
typedef std::map<SerialNum, Packet> PacketList;
- const Encoding _encoding;
- const uint8_t _compressionLevel;
- vespalib::Lock _lock;
- vespalib::Lock _fileLock;
- SerialNumRange _range;
- size_t _sz;
+ const Crc _defaultCrc;
+ vespalib::Lock _lock;
+ vespalib::Lock _fileLock;
+ SerialNumRange _range;
+ size_t _sz;
std::atomic<uint64_t> _byteSize;
- PacketList _packets;
- vespalib::string _fileName;
+ PacketList _packets;
+ vespalib::string _fileName;
std::unique_ptr<FastOS_FileInterface> _transLog;
- SkipList _skipList;
- uint32_t _headerLen;
- vespalib::Lock _writeLock;
+ SkipList _skipList;
+ uint32_t _headerLen;
+ vespalib::Lock _writeLock;
// Protected by _writeLock
- SerialNum _writtenSerial;
- SerialNum _syncedSerial;
+ SerialNum _writtenSerial;
+ SerialNum _syncedSerial;
};
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp b/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
deleted file mode 100644
index af7d396c0e4..00000000000
--- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.cpp
+++ /dev/null
@@ -1,108 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "chunks.h"
-#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/vespalib/util/crc.h>
-#include <vespa/vespalib/xxhash/xxhash.h>
-#include <cassert>
-
-using std::runtime_error;
-using std::make_unique;
-using vespalib::make_string;
-using vespalib::nbostream_longlivedbuf;
-using vespalib::compression::CompressionConfig;
-
-namespace search::transactionlog {
-
-Encoding::Encoding(Crc crc, Compression compression)
- : _raw(crc | (compression >> 2))
-{
- assert(crc <= Crc::xxh64);
- assert(compression <= Compression::lz4);
-}
-
-IChunk::~IChunk() = default;
-
-void
-IChunk::add(const Packet::Entry & entry) {
- _entries.emplace_back(entry);
-}
-
-SerialNumRange
-IChunk::range() const {
- return _entries.empty()
- ? SerialNumRange()
- : SerialNumRange(_entries.front().serial(), _entries.back().serial());
-}
-
-void
-IChunk::deserializeEntries(nbostream & is) {
- while (is.good() && !is.empty()) {
- Packet::Entry e;
- e.deserialize(is);
- add(e);
- }
-}
-
-void
-IChunk::serializeEntries(nbostream & os) const {
- for (const auto & e : _entries) {
- e.serialize(os);
- }
-}
-
-Encoding
-IChunk::encode(nbostream & os) const {
- return onEncode(os);
-}
-
-void
-IChunk::decode(nbostream & is) {
- onDecode(is);
-}
-
-IChunk::UP
-IChunk::create(uint8_t chunkType) {
- return create(Encoding(chunkType), 9);
-}
-IChunk::UP
-IChunk::create(Encoding encoding, uint8_t compressionLevel) {
- switch (encoding.getCrc()) {
- case Encoding::Crc::xxh64:
- switch (encoding.getCompression()) {
- case Encoding::Compression::none:
- return make_unique<XXH64None>();
- case Encoding::Compression::lz4:
- return make_unique<XXH64Compressed>(CompressionConfig::LZ4, compressionLevel);
- case Encoding::Compression::zstd:
- return make_unique<XXH64Compressed>(CompressionConfig::ZSTD, compressionLevel);
- default:
- return make_unique<XXH64Compressed>(CompressionConfig::LZ4, compressionLevel);
- }
- case Encoding::Crc::ccitt_crc32:
- switch (encoding.getCompression()) {
- case Encoding::Compression::none:
- return make_unique<CCITTCRC32None>();
- default:
- throw runtime_error(make_string("Unhandled compression type '%d' for ccitt_crc32 compression",
- encoding.getCompression()));
- }
- default:
- throw runtime_error(make_string("Unhandled crc type '%d'", encoding.getCrc()));
- }
-}
-
-int32_t Encoding::calcCrc(Crc version, const void * buf, size_t sz)
-{
- if (version == xxh64) {
- return static_cast<int32_t>(XXH64(buf, sz, 0ll));
- } else if (version == ccitt_crc32) {
- vespalib::crc_32_type calculator;
- calculator.process_bytes(buf, sz);
- return calculator.checksum();
- } else {
- abort();
- }
-}
-
-}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
deleted file mode 100644
index 0e913703468..00000000000
--- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
+++ /dev/null
@@ -1,57 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include "common.h"
-#include <memory>
-
-namespace vespalib { class nbostream; }
-
-namespace search::transactionlog {
-
-class Encoding {
-public:
- enum Crc {
- nocrc = 0,
- ccitt_crc32 = 1,
- xxh64 = 2
- };
- enum Compression {
- none = 0,
- lz4 = 1,
- zstd = 2
- };
- Encoding(uint8_t raw) : _raw(raw) {}
- Encoding(Crc crc, Compression compression);
- Crc getCrc() const { return Crc(_raw & 0x3); }
- Compression getCompression() const { return Compression((_raw >> 2) & 0xf); }
- static int32_t calcCrc(Crc version, const void * buf, size_t sz);
- uint8_t getRaw() const { return _raw; }
-private:
- uint8_t _raw;
-};
-
-class IChunk {
-public:
- using UP = std::unique_ptr<IChunk>;
- using Entries = std::vector<Packet::Entry>;
- using nbostream = vespalib::nbostream;
- using ConstBufferRef = vespalib::ConstBufferRef;
- virtual ~IChunk();
- const Entries & getEntries() const { return _entries; }
- void add(const Packet::Entry & entry);
- Encoding encode(nbostream & os) const;
- void decode(nbostream & buf);
- static UP create(uint8_t chunkType);
- static UP create(Encoding chunkType, uint8_t compressionLevel);
- SerialNumRange range() const;
-protected:
- virtual Encoding onEncode(nbostream & os) const = 0;
- virtual void onDecode(nbostream & is) = 0;
- void deserializeEntries(nbostream & is);
- void serializeEntries(nbostream & os) const;
-private:
- Entries _entries;
-};
-
-}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp
index 56aaa162485..cbcbc68fdff 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp
@@ -36,7 +36,7 @@ Session::VisitTask::run()
bool
Session::visit(FastOS_FileInterface & file, DomainPart & dp) {
- Packet packet(size_t(-1));
+ Packet packet;
bool more(false);
if (dp.isClosed()) {
more = dp.visit(file, _range, packet);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index 71b5b85c20f..4c3c5609a93 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -1,12 +1,10 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "translogserver.h"
-#include <vespa/searchlib/common/gatecallback.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/fnet/frt/supervisor.h>
#include <fstream>
-#include <thread>
#include <vespa/log/log.h>
LOG_SETUP(".transactionlog.server");
@@ -15,9 +13,6 @@ using vespalib::make_string;
using vespalib::stringref;
using vespalib::IllegalArgumentException;
using search::common::FileHeaderContext;
-using std::make_shared;
-using std::runtime_error;
-using namespace std::chrono_literals;
namespace search::transactionlog {
@@ -31,16 +26,21 @@ class SyncHandler : public FNET_Task
SerialNum _syncTo;
public:
- SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req,const Domain::SP &domain,
- const TransLogServer::Session::SP &session, SerialNum syncTo);
+ SyncHandler(FRT_Supervisor *supervisor,
+ FRT_RPCRequest *req,const Domain::SP &domain,
+ const TransLogServer::Session::SP &session,
+ SerialNum syncTo);
~SyncHandler();
void PerformTask() override;
};
-SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const Domain::SP &domain,
- const TransLogServer::Session::SP &session, SerialNum syncTo)
+SyncHandler::SyncHandler(FRT_Supervisor *supervisor,
+ FRT_RPCRequest *req,
+ const Domain::SP &domain,
+ const TransLogServer::Session::SP &session,
+ SerialNum syncTo)
: FNET_Task(supervisor->GetScheduler()),
_req(*req),
_domain(domain),
@@ -50,7 +50,9 @@ SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const
}
-SyncHandler::~SyncHandler() = default;
+SyncHandler::~SyncHandler()
+{
+}
void
@@ -75,25 +77,25 @@ SyncHandler::PerformTask()
TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
const FileHeaderContext &fileHeaderContext)
- : TransLogServer(name, listenPort, baseDir, fileHeaderContext,
- DomainConfig().setEncoding(Encoding(Encoding::xxh64, Encoding::Compression::none))
- .setPartSizeLimit(0x10000000).setChunkSizeLimit(0x40000).setChunkAgeLimit( 100us))
+ : TransLogServer(name, listenPort, baseDir, fileHeaderContext, 0x10000000)
{}
TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
- const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg)
- : TransLogServer(name, listenPort, baseDir, fileHeaderContext, cfg, 4)
+ const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize)
+ : TransLogServer(name, listenPort, baseDir, fileHeaderContext, domainPartSize, 4, DomainPart::Crc::xxh64)
{}
TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
- const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads)
+ const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize,
+ size_t maxThreads, DomainPart::Crc defaultCrcType)
: FRT_Invokable(),
_name(name),
_baseDir(baseDir),
- _domainConfig(cfg),
+ _domainPartSize(domainPartSize),
+ _defaultCrcType(defaultCrcType),
_commitExecutor(maxThreads, 128*1024),
_sessionExecutor(maxThreads, 128*1024),
- _threadPool(0x20000),
+ _threadPool(8192, 1),
_supervisor(std::make_unique<FRT_Supervisor>()),
_domains(),
_reqQ(),
@@ -108,8 +110,8 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con
domainDir >> domainName;
if ( ! domainName.empty()) {
try {
- auto domain = make_shared<Domain>(domainName, dir(), _threadPool, _commitExecutor,
- _sessionExecutor, cfg,_fileHeaderContext);
+ auto domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor,
+ _domainPartSize, _defaultCrcType,_fileHeaderContext);
_domains[domain->name()] = domain;
} catch (const std::exception & e) {
LOG(warning, "Failed creating %s domain on startup. Exception = %s", domainName.c_str(), e.what());
@@ -126,17 +128,17 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con
listenOk = true;
} else {
LOG(warning, "Failed listening at port %s trying for %d seconds more.", listenSpec, i);
- std::this_thread::sleep_for(1s);
+ FastOS_Thread::Sleep(1000);
}
}
if ( ! listenOk ) {
- throw runtime_error(make_string("Failed listening at port %s. Giving up. Requires manual intervention.", listenSpec));
+ throw std::runtime_error(make_string("Failed listening at port %s. Giving up. Requires manual intervention.", listenSpec));
}
} else {
- throw runtime_error(make_string("Failed creating tls dir %s r(%d), e(%d). Requires manual intervention.", dir().c_str(), retval, errno));
+ throw std::runtime_error(make_string("Failed creating tls dir %s r(%d), e(%d). Requires manual intervention.", dir().c_str(), retval, errno));
}
} else {
- throw runtime_error(make_string("Failed creating tls base dir %s r(%d), e(%d). Requires manual intervention.", _baseDir.c_str(), retval, errno));
+ throw std::runtime_error(make_string("Failed creating tls base dir %s r(%d), e(%d). Requires manual intervention.", _baseDir.c_str(), retval, errno));
}
start(_threadPool);
}
@@ -152,16 +154,14 @@ TransLogServer::~TransLogServer()
_supervisor->ShutDown(true);
}
-bool
-TransLogServer::onStop()
+bool TransLogServer::onStop()
{
LOG(info, "Stopping TLS");
_reqQ.push(NULL);
return true;
}
-void
-TransLogServer::run()
+void TransLogServer::run()
{
FRT_RPCRequest *req(NULL);
bool hasPacket(false);
@@ -200,12 +200,11 @@ TransLogServer::run()
}
}
logMetric();
- } while (running() && !(hasPacket && (req == nullptr)));
+ } while (running() && !(hasPacket && (req == NULL)));
LOG(info, "TLS Stopped");
}
-void
-TransLogServer::logMetric() const
+void TransLogServer::logMetric() const
{
Guard domainGuard(_lock);
for (DomainList::const_iterator it(_domains.begin()), mt(_domains.end()); it != mt; it++) {
@@ -216,17 +215,6 @@ TransLogServer::logMetric() const
}
}
-
-TransLogServer &
-TransLogServer::setDomainConfig(const DomainConfig & cfg) {
- Guard domainGuard(_lock);
- _domainConfig = cfg;
- for(auto &domain: _domains) {
- domain.second->setConfig(cfg);
- }
- return *this;
-}
-
DomainStats
TransLogServer::getDomainStats() const
{
@@ -261,8 +249,7 @@ TransLogServer::findDomain(const stringref &domainName)
return domain;
}
-void
-TransLogServer::exportRPC(FRT_Supervisor & supervisor)
+void TransLogServer::exportRPC(FRT_Supervisor & supervisor)
{
_supervisor->SetSessionInitHook(FRT_METHOD(TransLogServer::initSession), this);
_supervisor->SetSessionFiniHook(FRT_METHOD(TransLogServer::finiSession), this);
@@ -349,8 +336,7 @@ TransLogServer::exportRPC(FRT_Supervisor & supervisor)
rb.ReturnDesc("syncedto", "Entry synced to");
}
-void
-TransLogServer::createDomain(FRT_RPCRequest *req)
+void TransLogServer::createDomain(FRT_RPCRequest *req)
{
uint32_t retval(0);
FRT_Values & params = *req->GetParams();
@@ -363,8 +349,8 @@ TransLogServer::createDomain(FRT_RPCRequest *req)
Domain::SP domain(findDomain(domainName));
if ( !domain ) {
try {
- domain = make_shared<Domain>(domainName, dir(), _threadPool, _commitExecutor,
- _sessionExecutor, _domainConfig, _fileHeaderContext);
+ domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor,
+ _domainPartSize, _defaultCrcType, _fileHeaderContext);
{
Guard domainGuard(_lock);
_domains[domain->name()] = domain;
@@ -380,8 +366,7 @@ TransLogServer::createDomain(FRT_RPCRequest *req)
ret.AddInt32(retval);
}
-void
-TransLogServer::deleteDomain(FRT_RPCRequest *req)
+void TransLogServer::deleteDomain(FRT_RPCRequest *req)
{
uint32_t retval(0);
vespalib::string msg("ok");
@@ -420,8 +405,7 @@ TransLogServer::deleteDomain(FRT_RPCRequest *req)
ret.AddString(msg.c_str());
}
-void
-TransLogServer::openDomain(FRT_RPCRequest *req)
+void TransLogServer::openDomain(FRT_RPCRequest *req)
{
uint32_t retval(0);
FRT_Values & params = *req->GetParams();
@@ -438,8 +422,7 @@ TransLogServer::openDomain(FRT_RPCRequest *req)
ret.AddInt32(retval);
}
-void
-TransLogServer::listDomains(FRT_RPCRequest *req)
+void TransLogServer::listDomains(FRT_RPCRequest *req)
{
FRT_Values & ret = *req->GetReturn();
LOG(debug, "listDomains()");
@@ -454,8 +437,7 @@ TransLogServer::listDomains(FRT_RPCRequest *req)
ret.AddString(domains.c_str());
}
-void
-TransLogServer::domainStatus(FRT_RPCRequest *req)
+void TransLogServer::domainStatus(FRT_RPCRequest *req)
{
FRT_Values & params = *req->GetParams();
FRT_Values & ret = *req->GetReturn();
@@ -475,20 +457,18 @@ TransLogServer::domainStatus(FRT_RPCRequest *req)
}
}
-void
-TransLogServer::commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done)
+void TransLogServer::commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done)
{
(void) done;
Domain::SP domain(findDomain(domainName));
if (domain) {
- domain->commit(packet, std::move(done));
+ domain->commit(packet);
} else {
throw IllegalArgumentException("Could not find domain " + domainName);
}
}
-void
-TransLogServer::domainCommit(FRT_RPCRequest *req)
+void TransLogServer::domainCommit(FRT_RPCRequest *req)
{
FRT_Values & params = *req->GetParams();
FRT_Values & ret = *req->GetReturn();
@@ -498,9 +478,7 @@ TransLogServer::domainCommit(FRT_RPCRequest *req)
if (domain) {
Packet packet(params[1]._data._buf, params[1]._data._len);
try {
- vespalib::Gate gate;
- domain->commit(packet, make_shared<GateCallback>(gate));
- gate.await();
+ domain->commit(packet);
ret.AddInt32(0);
ret.AddString("ok");
} catch (const std::exception & e) {
@@ -513,8 +491,7 @@ TransLogServer::domainCommit(FRT_RPCRequest *req)
}
}
-void
-TransLogServer::domainVisit(FRT_RPCRequest *req)
+void TransLogServer::domainVisit(FRT_RPCRequest *req)
{
uint32_t retval(uint32_t(-1));
FRT_Values & params = *req->GetParams();
@@ -531,8 +508,7 @@ TransLogServer::domainVisit(FRT_RPCRequest *req)
ret.AddInt32(retval);
}
-void
-TransLogServer::domainSessionRun(FRT_RPCRequest *req)
+void TransLogServer::domainSessionRun(FRT_RPCRequest *req)
{
uint32_t retval(uint32_t(-1));
FRT_Values & params = *req->GetParams();
@@ -548,15 +524,13 @@ TransLogServer::domainSessionRun(FRT_RPCRequest *req)
ret.AddInt32(retval);
}
-void
-TransLogServer::relayToThreadRPC(FRT_RPCRequest *req)
+void TransLogServer::relayToThreadRPC(FRT_RPCRequest *req)
{
req->Detach();
_reqQ.push(req);
}
-void
-TransLogServer::domainSessionClose(FRT_RPCRequest *req)
+void TransLogServer::domainSessionClose(FRT_RPCRequest *req)
{
uint32_t retval(uint32_t(-1));
FRT_Values & params = *req->GetParams();
@@ -573,8 +547,7 @@ TransLogServer::domainSessionClose(FRT_RPCRequest *req)
ret.AddInt32(retval);
}
-void
-TransLogServer::domainPrune(FRT_RPCRequest *req)
+void TransLogServer::domainPrune(FRT_RPCRequest *req)
{
uint32_t retval(uint32_t(-1));
FRT_Values & params = *req->GetParams();
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
index 83627d2d3e3..d78d3d39887 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
@@ -22,15 +22,16 @@ public:
typedef std::shared_ptr<TransLogServer> SP;
TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
- const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads);
+ const common::FileHeaderContext &fileHeaderContext,
+ uint64_t domainPartSize, size_t maxThreads, DomainPart::Crc defaultCrc);
TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
- const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg);
+ const common::FileHeaderContext &fileHeaderContext, uint64_t domainPartSize);
TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
const common::FileHeaderContext &fileHeaderContext);
~TransLogServer() override;
DomainStats getDomainStats() const;
+
void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) override;
- TransLogServer & setDomainConfig(const DomainConfig & cfg);
class Session
{
@@ -78,7 +79,8 @@ private:
vespalib::string _name;
vespalib::string _baseDir;
- DomainConfig _domainConfig;
+ const uint64_t _domainPartSize;
+ const DomainPart::Crc _defaultCrcType;
vespalib::ThreadStackExecutor _commitExecutor;
vespalib::ThreadStackExecutor _sessionExecutor;
FastOS_ThreadPool _threadPool;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
index de3d72331d6..ff4a402b438 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
@@ -7,17 +7,12 @@
LOG_SETUP(".translogserverapp");
using search::common::FileHeaderContext;
-using namespace std::chrono_literals;
namespace search::transactionlog {
-using LockGuard = std::lock_guard<std::mutex>;
-using std::make_unique;
-
TransLogServerApp::TransLogServerApp(const config::ConfigUri & tlsConfigUri,
const FileHeaderContext & fileHeaderContext)
- : _lock(),
- _tls(),
+ : _tls(),
_tlsConfig(),
_tlsConfigFetcher(tlsConfigUri.getContext()),
_fileHeaderContext(fileHeaderContext)
@@ -28,58 +23,24 @@ TransLogServerApp::TransLogServerApp(const config::ConfigUri & tlsConfigUri,
namespace {
-
-Encoding::Crc
-getCrc(searchlib::TranslogserverConfig::Crcmethod type)
+DomainPart::Crc getCrc(searchlib::TranslogserverConfig::Crcmethod crcType)
{
- switch (type) {
+ switch (crcType) {
case searchlib::TranslogserverConfig::ccitt_crc32:
- return Encoding::Crc::ccitt_crc32;
+ return DomainPart::ccitt_crc32;
case searchlib::TranslogserverConfig::xxh64:
- return Encoding::Crc::xxh64;
- }
- return Encoding::Crc::xxh64;
-}
-
-Encoding::Compression
-getCompression(searchlib::TranslogserverConfig::Compression::Type type)
-{
- switch (type) {
- case searchlib::TranslogserverConfig::Compression::NONE:
- return Encoding::Compression::none;
- case searchlib::TranslogserverConfig::Compression::LZ4:
- return Encoding::Compression::lz4;
- case searchlib::TranslogserverConfig::Compression::ZSTD:
- return Encoding::Compression::zstd;
+ return DomainPart::xxh64;
}
- return Encoding::Compression::lz4;
-}
-
-Encoding
-getEncoding(const searchlib::TranslogserverConfig & cfg)
-{
- return Encoding(getCrc(cfg.crcmethod), getCompression(cfg.compression.type));
-}
-
-DomainConfig
-getDomainConfig(const searchlib::TranslogserverConfig & cfg) {
- DomainConfig dcfg;
- dcfg.setEncoding(getEncoding(cfg))
- .setCompressionLevel(cfg.compression.level)
- .setPartSizeLimit(cfg.filesizemax)
- .setChunkSizeLimit(cfg.chunk.sizelimit)
- .setChunkAgeLimit(std::chrono::microseconds(int64_t(cfg.chunk.agelimit*1000000)));
- return dcfg;
+ abort();
}
}
void TransLogServerApp::start()
{
- LockGuard guard(_lock);
- auto c = _tlsConfig.get();
- _tls = make_unique<TransLogServer>(c->servername, c->listenport, c->basedir, _fileHeaderContext,
- getDomainConfig(*c), c->maxthreads);
+ std::shared_ptr<searchlib::TranslogserverConfig> c = _tlsConfig.get();
+ _tls.reset(new TransLogServer(c->servername, c->listenport, c->basedir, _fileHeaderContext,
+ c->filesizemax, c->maxthreads, getCrc(c->crcmethod)));
}
TransLogServerApp::~TransLogServerApp()
@@ -90,12 +51,8 @@ TransLogServerApp::~TransLogServerApp()
void TransLogServerApp::configure(std::unique_ptr<searchlib::TranslogserverConfig> cfg)
{
LOG(config, "configure Transaction Log Server %s at port %d", cfg->servername.c_str(), cfg->listenport);
- LockGuard guard(_lock);
_tlsConfig.set(cfg.release());
_tlsConfig.latch();
- if (_tls) {
- _tls->setDomainConfig(getDomainConfig(*cfg));
- }
}
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h
index ea6d0158cec..35fa994d1e4 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.h
@@ -14,7 +14,6 @@ namespace search::transactionlog {
class TransLogServerApp : public config::IFetcherCallback<searchlib::TranslogserverConfig>
{
private:
- std::mutex _lock;
TransLogServer::SP _tls;
vespalib::PtrHolder<searchlib::TranslogserverConfig> _tlsConfig;
config::ConfigFetcher _tlsConfigFetcher;
diff --git a/vespalib/src/vespa/vespalib/objects/nbostream.h b/vespalib/src/vespa/vespalib/objects/nbostream.h
index 8eb42ddfd53..70a590f79d1 100644
--- a/vespalib/src/vespa/vespalib/objects/nbostream.h
+++ b/vespalib/src/vespa/vespalib/objects/nbostream.h
@@ -20,7 +20,7 @@ class nbostream
public:
using Buffer = Array<char>;
using Alloc = alloc::Alloc;
- enum State { ok=0, eof=0x01, oob=0x02};
+ enum State { ok=0, eof=0x01};
nbostream(size_t initialSize=1024);
protected:
nbostream(const void * buf, size_t sz, bool longLivedBuffer);
@@ -145,7 +145,6 @@ public:
const char * peek() const { return &_rbuf[_rp]; }
size_t rp() const { return _rp; }
nbostream & rp(size_t pos) { if (pos > _wp) fail(eof); _rp = pos; return *this; }
- nbostream & wp(size_t pos) { if (pos > _wbuf.size()) fail(oob); _wp = pos; return *this; }
size_t wp() const { return _wp; }
State state() const { return _state; }
bool good() const { return _state == ok; }