diff options
Diffstat (limited to 'searchlib/src/tests/transactionlog/translogclient_test.cpp')
-rw-r--r-- | searchlib/src/tests/transactionlog/translogclient_test.cpp | 926 |
1 files changed, 926 insertions, 0 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp new file mode 100644 index 00000000000..775654d23fc --- /dev/null +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -0,0 +1,926 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/searchlib/transactionlog/translogclient.h> +#include <vespa/searchlib/transactionlog/translogserver.h> +#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/vespalib/objects/identifiable.h> +#include <vespa/searchlib/index/dummyfileheadercontext.h> +#include <vespa/log/log.h> +#include <map> +LOG_SETUP("translogclient_test"); + +using namespace search; +using namespace transactionlog; +using namespace document; +using namespace vespalib; +using search::index::DummyFileHeaderContext; + +vespalib::string myhex(const void * b, size_t sz) +{ + static const char * hextab="0123456789ABCDEF"; + const unsigned char * c = static_cast<const unsigned char *>(b); + vespalib::string s; + s.reserve(sz*2); + for (size_t i=0; i < sz; i++) { + s += hextab[c[i] >> 4]; + s += hextab[c[i] & 0x0f]; + } + return s; +} + +class Test : public vespalib::TestApp +{ +public: + int Main(); +private: + bool createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains=0); + TransLogClient::Session::UP openDomainTest(TransLogClient & tls, const vespalib::string & name); + bool fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name); + void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries); + void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize); + uint32_t countFiles(const vespalib::string &dir); + void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries); + bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name); + bool subscribeDomainTest(TransLogClient & tls, const vespalib::string & name); + bool partialUpdateTest(); + bool test1(); + bool testRemove(); + void createAndFillDomain(const vespalib::string & name, DomainPart::Crc crcMethod, size_t preExistingDomains); + void verifyDomain(const vespalib::string & name); + void testCrcVersions(); + bool test2(); + void testMany(); + void testErase(); + void testSync(); + void testTruncateOnShortRead(); + void testTruncateOnVersionMismatch(); +}; + +TEST_APPHOOK(Test); + +class CallBackTest : public TransLogClient::Subscriber::Callback +{ +private: + virtual RPC::Result receive(const Packet & packet); + virtual void inSync() { _inSync = true; } + virtual void eof() { _eof = true; } + typedef std::map<SerialNum, ByteBuffer> PacketMap; + PacketMap _packetMap; +public: + CallBackTest() : _inSync(false), _eof(false) { } + size_t size() const { return _packetMap.size(); } + bool hasSerial(SerialNum n) const { return (_packetMap.find(n) != _packetMap.end()); } + void clear() { _inSync = false; _eof = false; _packetMap.clear(); } + const ByteBuffer & packet(SerialNum n) { return (_packetMap.find(n)->second); } + + bool _inSync; + bool _eof; +}; + +RPC::Result CallBackTest::receive(const Packet & p) +{ + vespalib::nbostream h(p.getHandle().c_str(), p.getHandle().size(), true); + LOG(info,"CallBackTest::receive (%zu, %zu, %zu)(%s)", h.rp(), h.size(), h.capacity(), myhex(h.peek(), h.size()).c_str()); + while(h.size() > 0) { + Packet::Entry e; + e.deserialize(h); + LOG(info,"CallBackTest::receive (%zu, %zu, %zu)(%s)", h.rp(), h.size(), h.capacity(), myhex(e.data().c_str(), e.data().size()).c_str()); + _packetMap[e.serial()] = ByteBuffer(e.data().c_str(), e.data().size()); + } + return RPC::OK; +} + +class CallBackManyTest : public TransLogClient::Subscriber::Callback +{ +private: + virtual RPC::Result receive(const Packet & packet); + virtual void inSync() { _inSync = true; } + virtual void eof() { _eof = true; } +public: + CallBackManyTest(size_t start) : _inSync(false), _eof(false), _count(start), _value(start) { } + void clear() { _inSync = false; _eof = false; _count = 0; _value = 0; } + bool _inSync; + bool _eof; + size_t _count; + size_t _value; +}; + +RPC::Result CallBackManyTest::receive(const Packet & p) +{ + nbostream h(p.getHandle().c_str(), p.getHandle().size(), true); + for(;h.size() > 0; _count++, _value++) { + Packet::Entry e; + e.deserialize(h); + assert(e.data().size() == 8); + size_t v = *(const size_t*) e.data().c_str(); + assert(_count+1 == e.serial()); + assert(v == _value); + (void) v; + } + return RPC::OK; +} + +class CallBackUpdate : public TransLogClient::Subscriber::Callback +{ +public: + typedef std::map<SerialNum, Identifiable *> PacketMap; +private: + virtual RPC::Result receive(const Packet & packet); + virtual void inSync() { _inSync = true; } + virtual void eof() { _eof = true; } + PacketMap _packetMap; +public: + CallBackUpdate() : _inSync(false), _eof(false) { } + virtual ~CallBackUpdate() { while (_packetMap.begin() != _packetMap.end()) { delete _packetMap.begin()->second; _packetMap.erase(_packetMap.begin()); } } + bool hasSerial(SerialNum n) const { return (_packetMap.find(n) != _packetMap.end()); } + const PacketMap & map() const { return _packetMap; } + bool _inSync; + bool _eof; +}; + + +RPC::Result CallBackUpdate::receive(const Packet & packet) +{ + nbostream h(packet.getHandle().c_str(), packet.getHandle().size(), true); + while (h.size() > 0) { + Packet::Entry e; + e.deserialize(h); + const vespalib::Identifiable::RuntimeClass * cl(vespalib::Identifiable::classFromId(e.type())); + if (cl) { + vespalib::Identifiable * obj(cl->create()); + if (obj->inherits(Identifiable::classId)) { + Identifiable * ser = static_cast<Identifiable *>(obj); + nbostream is(e.data().c_str(), e.data().size()); + try { + is >> *ser; + } catch (std::exception & ex) { + LOG(warning, "Failed deserializing (%" PRId64 ", %s) bb(%zu, %zu, %zu)=%s what=%s", e.serial(), cl->name(), is.rp(), is.size(), is.capacity(), myhex(is.peek(), is.size()).c_str(), ex.what()); + assert(false); + return RPC::ERROR; + } + assert(is.state() == nbostream::ok); + assert(is.size() == 0); + _packetMap[e.serial()] = ser; + } else { + LOG(warning, "Packet::Entry(%" PRId64 ", %s) is not a Identifiable", e.serial(), cl->name()); + } + } else { + LOG(warning, "Packet::Entry(%" PRId64 ", %d) is not recognized by vespalib::Identifiable", e.serial(), e.type()); + } + } + return RPC::OK; +} + +class CallBackStatsTest : public TransLogClient::Session::Callback +{ +private: + virtual RPC::Result receive(const Packet & packet); + virtual void inSync() { _inSync = true; } + virtual void eof() { _eof = true; } +public: + CallBackStatsTest() : _inSync(false), _eof(false), + _count(0), _inOrder(0), + _firstSerial(0), _lastSerial(0), + _prevSerial(0) { } + void clear() { _inSync = false; _eof = false; _count = 0; _inOrder = 0; + _firstSerial = 0; _lastSerial = 0; _inOrder = 0; } + bool _inSync; + bool _eof; + uint64_t _count; + uint64_t _inOrder; // increase when next entry is one above previous + SerialNum _firstSerial; + SerialNum _lastSerial; + SerialNum _prevSerial; +}; + +RPC::Result CallBackStatsTest::receive(const Packet & p) +{ + nbostream h(p.getHandle().c_str(), p.getHandle().size(), true); + for(;h.size() > 0; ++_count) { + Packet::Entry e; + e.deserialize(h); + SerialNum s = e.serial(); + if (_count == 0) { + _firstSerial = s; + _lastSerial = s; + } + if (s == _prevSerial + 1) { + ++_inOrder; + } + _prevSerial = s; + if (_firstSerial > s) { + _firstSerial = s; + } + if (_lastSerial < s) { + _lastSerial = s; + } + } + return RPC::OK; +} + +#define CID_TestIdentifiable 0x5762314 + +class TestIdentifiable : public Identifiable +{ +public: + DECLARE_IDENTIFIABLE(TestIdentifiable); + TestIdentifiable() { } +}; + +IMPLEMENT_IDENTIFIABLE(TestIdentifiable, Identifiable); + +bool Test::partialUpdateTest() +{ + bool retval(false); + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); + TransLogClient tls("tcp/localhost:18377"); + + TransLogClient::Session::UP s1 = openDomainTest(tls, "test1"); + TransLogClient::Session & session = *s1; + + TestIdentifiable du; + + nbostream os; + os << du; + + vespalib::ConstBufferRef bb(os.c_str(), os.size()); + LOG(info, "DU : %s", myhex(bb.c_str(), bb.size()).c_str()); + Packet::Entry e(7, du.getClass().id(), bb); + Packet pa; + pa.add(e); + pa.close(); + ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().c_str(), pa.getHandle().size()))); + + CallBackUpdate ca; + TransLogClient::Visitor::UP visitor = tls.createVisitor("test1", ca); + ASSERT_TRUE(visitor.get()); + ASSERT_TRUE( visitor->visit(5, 7) ); + for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); } + ASSERT_TRUE( ! ca._inSync ); + ASSERT_TRUE( ca._eof ); + ASSERT_TRUE( ca.map().size() == 1); + ASSERT_TRUE( ca.hasSerial(7) ); + + CallBackUpdate ca1; + TransLogClient::Visitor::UP visitor1 = tls.createVisitor("test1", ca1); + ASSERT_TRUE(visitor1.get()); + ASSERT_TRUE( visitor1->visit(4, 5) ); + for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); } + ASSERT_TRUE( ! ca1._inSync ); + ASSERT_TRUE( ca1._eof ); + ASSERT_TRUE( ca1.map().size() == 0); + + CallBackUpdate ca2; + TransLogClient::Visitor::UP visitor2 = tls.createVisitor("test1", ca2); + ASSERT_TRUE(visitor2.get()); + ASSERT_TRUE( visitor2->visit(5, 6) ); + for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); } + ASSERT_TRUE( ! ca2._inSync ); + ASSERT_TRUE( ca2._eof ); + ASSERT_TRUE( ca2.map().size() == 0); + + CallBackUpdate ca3; + TransLogClient::Visitor::UP visitor3 = tls.createVisitor("test1", ca3); + ASSERT_TRUE(visitor3.get()); + ASSERT_TRUE( visitor3->visit(5, 1000) ); + for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); } + ASSERT_TRUE( ! ca3._inSync ); + ASSERT_TRUE( ca3._eof ); + ASSERT_TRUE( ca3.map().size() == 1); + ASSERT_TRUE( ca3.hasSerial(7) ); + + return retval; +} + +bool Test::createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains) +{ + bool retval(true); + std::vector<vespalib::string> dir; + tls.listDomains(dir); + EXPECT_EQUAL (dir.size(), preExistingDomains); + TransLogClient::Session::UP s1 = tls.open(name); + ASSERT_TRUE (s1.get() == NULL); + retval = tls.create(name); + ASSERT_TRUE (retval); + dir.clear(); + tls.listDomains(dir); + EXPECT_EQUAL (dir.size(), preExistingDomains+1); +// ASSERT_TRUE (dir[0] == name); + return retval; +} + +TransLogClient::Session::UP Test::openDomainTest(TransLogClient & tls, const vespalib::string & name) +{ + TransLogClient::Session::UP s1 = tls.open(name); + ASSERT_TRUE (s1.get() != NULL); + return s1; +} + +bool Test::fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name) +{ + bool retval(true); + Packet::Entry e1(1, 1, vespalib::ConstBufferRef("Content in buffer A", 20)); + Packet::Entry e2(2, 2, vespalib::ConstBufferRef("Content in buffer B", 20)); + Packet::Entry e3(3, 1, vespalib::ConstBufferRef("Content in buffer C", 20)); + + Packet a; + ASSERT_TRUE (a.add(e1)); + Packet b; + ASSERT_TRUE (b.add(e2)); + ASSERT_TRUE (b.add(e3)); + ASSERT_TRUE (!b.add(e1)); + a.close(); + b.close(); + ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(a.getHandle().c_str(), a.getHandle().size()))); + ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(b.getHandle().c_str(), b.getHandle().size()))); + try { + s1->commit(vespalib::ConstBufferRef(a.getHandle().c_str(), a.getHandle().size())); + ASSERT_TRUE(false); + } catch (const std::exception & e) { + EXPECT_EQUAL(vespalib::string("commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3)."), e.what()); + } + EXPECT_EQUAL(a.size(), 1u); + EXPECT_EQUAL(a.range().from(), 1u); + EXPECT_EQUAL(a.range().to(), 1u); + EXPECT_EQUAL(b.size(), 2u); + EXPECT_EQUAL(b.range().from(), 2u); + EXPECT_EQUAL(b.range().to(), 3u); + EXPECT_TRUE(a.merge(b)); + EXPECT_EQUAL(a.size(), 3u); + EXPECT_EQUAL(a.range().from(), 1u); + EXPECT_EQUAL(a.range().to(), 3u); + + Packet::Entry e; + vespalib::nbostream h(a.getHandle().c_str(), a.getHandle().size()); + e.deserialize(h); + e.deserialize(h); + e.deserialize(h); + EXPECT_EQUAL(h.size(), 0u); + + return retval; +} + +void Test::fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries) +{ + size_t value(0); + for(size_t i=0; i < numPackets; i++) { + std::unique_ptr<Packet> p(new Packet()); + for(size_t j=0; j < numEntries; j++, value++) { + Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value))); + if ( ! p->add(e) ) { + p->close(); + ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size()))); + p.reset(new Packet()); + ASSERT_TRUE(p->add(e)); + } + } + p->close(); + ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size()))); + } +} + + +void +Test::fillDomainTest(TransLogClient::Session * s1, + size_t numPackets, size_t numEntries, + size_t entrySize) +{ + size_t value(0); + std::vector<char> entryBuffer(entrySize); + for(size_t i=0; i < numPackets; i++) { + std::unique_ptr<Packet> p(new Packet()); + for(size_t j=0; j < numEntries; j++, value++) { + Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&entryBuffer[0], entryBuffer.size())); + if ( ! p->add(e) ) { + p->close(); + ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size()))); + p.reset(new Packet()); + ASSERT_TRUE(p->add(e)); + } + } + p->close(); + ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().c_str(), p->getHandle().size()))); + } +} + + +uint32_t +Test::countFiles(const vespalib::string &dir) +{ + uint32_t res = 0; + FastOS_DirectoryScan dirScan(dir.c_str()); + while (dirScan.ReadNext()) { + const char *ename = dirScan.GetName(); + if (strcmp(ename, ".") == 0 || + strcmp(ename, "..") == 0) + continue; + ++res; + } + return res; +} + + +void +Test::checkFilledDomainTest(const TransLogClient::Session::UP &s1, + size_t numEntries) +{ + SerialNum b(0), e(0); + size_t c(0); + EXPECT_TRUE(s1->status(b, e, c)); + EXPECT_EQUAL(b, 1u); + EXPECT_EQUAL(e, numEntries); + EXPECT_EQUAL(c, numEntries); +} + + +bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name) +{ + bool retval(true); + + SerialNum b(0), e(0); + size_t c(0); + EXPECT_TRUE(s1->status(b, e, c)); + EXPECT_EQUAL(b, 1u); + EXPECT_EQUAL(e, 3u); + EXPECT_EQUAL(c, 3u); + + CallBackTest ca; + TransLogClient::Visitor::UP visitor = tls.createVisitor(name, ca); + ASSERT_TRUE(visitor.get()); + EXPECT_TRUE( visitor->visit(0, 1) ); + for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } + EXPECT_TRUE( ! ca._inSync ); + EXPECT_TRUE( ca._eof ); + EXPECT_TRUE( ! ca.hasSerial(0) ); + EXPECT_TRUE( ca.hasSerial(1) ); + EXPECT_TRUE( ! ca.hasSerial(2) ); + ca.clear(); + + visitor = tls.createVisitor(name, ca); + ASSERT_TRUE(visitor.get()); + EXPECT_TRUE( visitor->visit(1, 2) ); + for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } + EXPECT_TRUE( ! ca._inSync ); + EXPECT_TRUE( ca._eof ); + EXPECT_TRUE( ! ca.hasSerial(0) ); + EXPECT_TRUE( ! ca.hasSerial(1) ); + EXPECT_TRUE( ca.hasSerial(2) ); + EXPECT_TRUE( ! ca.hasSerial(3) ); + ca.clear(); + + visitor = tls.createVisitor(name, ca); + EXPECT_TRUE(visitor.get()); + EXPECT_TRUE( visitor->visit(0, 3) ); + for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } + EXPECT_TRUE( ! ca._inSync ); + EXPECT_TRUE( ca._eof ); + EXPECT_TRUE( ! ca.hasSerial(0) ); + EXPECT_TRUE( ca.hasSerial(1) ); + EXPECT_TRUE( ca.hasSerial(2) ); + EXPECT_TRUE( ca.hasSerial(3) ); + ca.clear(); + + visitor = tls.createVisitor(name, ca); + ASSERT_TRUE(visitor.get()); + EXPECT_TRUE( visitor->visit(2, 3) ); + for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } + EXPECT_TRUE( ! ca._inSync ); + EXPECT_TRUE( ca._eof ); + EXPECT_TRUE( ! ca.hasSerial(0) ); + EXPECT_TRUE( !ca.hasSerial(1) ); + EXPECT_TRUE( !ca.hasSerial(2) ); + EXPECT_TRUE( ca.hasSerial(3) ); + ca.clear(); + + return retval; +} + +bool Test::subscribeDomainTest(TransLogClient & tls, const vespalib::string & name) +{ + bool retval(true); + CallBackTest ca; + TransLogClient::Subscriber::UP subscriber = tls.createSubscriber(name, ca); + ASSERT_TRUE(subscriber.get()); + ASSERT_TRUE( subscriber->subscribe(0) ); + for (size_t i(0); ! ca._inSync && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } + ASSERT_TRUE( ca._inSync ); + ASSERT_TRUE( ! ca.hasSerial(0) ); + ASSERT_TRUE( ! ca._eof ); + ASSERT_TRUE( ca.hasSerial(1) ); + ASSERT_TRUE( ca.hasSerial(2) ); + ASSERT_TRUE( ca.hasSerial(3) ); + return retval; +} + +bool Test::test1() +{ + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); + TransLogClient tls("tcp/localhost:18377"); + + vespalib::string name("test1"); + createDomainTest(tls, name); + TransLogClient::Session::UP s1 = openDomainTest(tls, name); + fillDomainTest(s1.get(), name); + visitDomainTest(tls, s1.get(), name); + subscribeDomainTest(tls, name); + return true; +} + +void Test::createAndFillDomain(const vespalib::string & name, DomainPart::Crc crcMethod, size_t preExistingDomains) +{ + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000, false, 4, crcMethod); + TransLogClient tls("tcp/localhost:18377"); + + createDomainTest(tls, name, preExistingDomains); + TransLogClient::Session::UP s1 = openDomainTest(tls, name); + fillDomainTest(s1.get(), name); +} + +void Test::verifyDomain(const vespalib::string & name) +{ + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000); + TransLogClient tls("tcp/localhost:18377"); + TransLogClient::Session::UP s1 = openDomainTest(tls, name); + visitDomainTest(tls, s1.get(), name); +} + +void Test::testCrcVersions() +{ + createAndFillDomain("ccitt_crc32", DomainPart::ccitt_crc32, 0); + createAndFillDomain("xxh64", DomainPart::xxh64, 1); + + verifyDomain("ccitt_crc32"); + verifyDomain("xxh64"); +} + +bool Test::testRemove() +{ + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, 0x10000); + TransLogClient tls("tcp/localhost:18377"); + + vespalib::string name("test-delete"); + createDomainTest(tls, name); + TransLogClient::Session::UP s1 = openDomainTest(tls, name); + fillDomainTest(s1.get(), name); + visitDomainTest(tls, s1.get(), name); + subscribeDomainTest(tls, name); + ASSERT_TRUE(tls.remove(name)); + + return true; +} + +bool Test::test2() +{ + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); + TransLogClient tls("tcp/localhost:18377"); + + vespalib::string name("test1"); + TransLogClient::Session::UP s1 = openDomainTest(tls, name); + visitDomainTest(tls, s1.get(), name); + subscribeDomainTest(tls, name); + return true; +} + +namespace { + +void +assertVisitStats(TransLogClient &tls, const vespalib::string &domain, + SerialNum visitStart, SerialNum visitEnd, + SerialNum expFirstSerial, SerialNum expLastSerial, + uint64_t expCount, uint64_t expInOrder) +{ + CallBackStatsTest ca; + TransLogClient::Visitor::UP visitor = tls.createVisitor(domain, ca); + ASSERT_TRUE(visitor.get()); + ASSERT_TRUE( visitor->visit(visitStart, visitEnd) ); + for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { + FastOS_Thread::Sleep(10); + } + ASSERT_TRUE(!ca._inSync); + ASSERT_TRUE(ca._eof); + EXPECT_EQUAL(expFirstSerial, ca._firstSerial); + EXPECT_EQUAL(expLastSerial, ca._lastSerial); + EXPECT_EQUAL(expCount, ca._count); + EXPECT_EQUAL(expInOrder, ca._inOrder); +} + +void +assertStatus(TransLogClient::Session &s, + SerialNum expFirstSerial, SerialNum expLastSerial, + uint64_t expCount) +{ + SerialNum b(0), e(0); + size_t c(0); + EXPECT_TRUE(s.status(b, e, c)); + EXPECT_EQUAL(expFirstSerial, b); + EXPECT_EQUAL(expLastSerial, e); + EXPECT_EQUAL(expCount, c); +} + +} + + +void Test::testMany() +{ + const unsigned int NUM_PACKETS = 1000; + const unsigned int NUM_ENTRIES = 100; + const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; + { + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x80000); + TransLogClient tls("tcp/localhost:18377"); + + createDomainTest(tls, "many", 0); + TransLogClient::Session::UP s1 = openDomainTest(tls, "many"); + fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES); + SerialNum b(0), e(0); + size_t c(0); + EXPECT_TRUE(s1->status(b, e, c)); + EXPECT_EQUAL(b, 1u); + EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); + EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); + CallBackManyTest ca(2); + TransLogClient::Visitor::UP visitor = tls.createVisitor("many", ca); + ASSERT_TRUE(visitor.get()); + ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); + for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } + ASSERT_TRUE( ! ca._inSync ); + ASSERT_TRUE( ca._eof ); + EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES); + EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES); + } + { + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x1000000); + TransLogClient tls("tcp/localhost:18377"); + + TransLogClient::Session::UP s1 = openDomainTest(tls, "many"); + SerialNum b(0), e(0); + size_t c(0); + EXPECT_TRUE(s1->status(b, e, c)); + EXPECT_EQUAL(b, 1u); + EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); + EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); + CallBackManyTest ca(2); + TransLogClient::Visitor::UP visitor = tls.createVisitor("many", ca); + ASSERT_TRUE(visitor.get()); + ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); + for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } + ASSERT_TRUE( ! ca._inSync ); + ASSERT_TRUE( ca._eof ); + EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES); + EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES); + } +} + +void Test::testErase() +{ + const unsigned int NUM_PACKETS = 1000; + const unsigned int NUM_ENTRIES = 100; + const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; + { + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x80000); + TransLogClient tls("tcp/localhost:18377"); + + createDomainTest(tls, "erase", 0); + TransLogClient::Session::UP s1 = openDomainTest(tls, "erase"); + fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES); + } + { + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x1000000); + TransLogClient tls("tcp/localhost:18377"); + + TransLogClient::Session::UP s1 = openDomainTest(tls, "erase"); + + // Before erase + TEST_DO(assertVisitStats(tls, "erase", 2, TOTAL_NUM_ENTRIES, + 3, TOTAL_NUM_ENTRIES, + TOTAL_NUM_ENTRIES -2, TOTAL_NUM_ENTRIES - 3)); + DomainStats domainStats = tlss.getDomainStats(); + DomainInfo domainInfo = domainStats["erase"]; + size_t numParts = domainInfo.parts.size(); + LOG(info, "%zu parts", numParts); + for (uint32_t partId = 0; partId < numParts; ++partId) { + const PartInfo &part = domainInfo.parts[partId]; + LOG(info, + "part %u from %" PRIu64 " to %" PRIu64 ", " + "count %zu, numBytes %zu", + partId, + (uint64_t) part.range.from(), (uint64_t) part.range.to(), + part.count, part.byteSize); + } + ASSERT_LESS_EQUAL(2u, numParts); + // Erase everything before second to last domainpart file + SerialNum eraseSerial = domainInfo.parts[numParts - 2].range.from(); + s1->erase(eraseSerial); + TEST_DO(assertVisitStats(tls, "erase", 2, TOTAL_NUM_ENTRIES, + eraseSerial, TOTAL_NUM_ENTRIES, + TOTAL_NUM_ENTRIES + 1 - eraseSerial, + TOTAL_NUM_ENTRIES - eraseSerial)); + TEST_DO(assertStatus(*s1, eraseSerial, TOTAL_NUM_ENTRIES, + domainInfo.parts[numParts - 2].count + + domainInfo.parts[numParts - 1].count)); + // No apparent effect of erasing just first entry in 2nd to last part + s1->erase(eraseSerial + 1); + TEST_DO(assertVisitStats(tls, "erase", 2, TOTAL_NUM_ENTRIES, + eraseSerial, TOTAL_NUM_ENTRIES, + TOTAL_NUM_ENTRIES + 1 - eraseSerial, + TOTAL_NUM_ENTRIES - eraseSerial)); + TEST_DO(assertStatus(*s1, eraseSerial + 1, TOTAL_NUM_ENTRIES, + domainInfo.parts[numParts - 2].count + + domainInfo.parts[numParts - 1].count)); + // No apparent effect of erasing almost all of 2nd to last part + SerialNum eraseSerial2 = domainInfo.parts[numParts - 2].range.to(); + s1->erase(eraseSerial2); + TEST_DO(assertVisitStats(tls, "erase", 2, TOTAL_NUM_ENTRIES, + eraseSerial, TOTAL_NUM_ENTRIES, + TOTAL_NUM_ENTRIES + 1 - eraseSerial, + TOTAL_NUM_ENTRIES - eraseSerial)); + TEST_DO(assertStatus(*s1, eraseSerial2, TOTAL_NUM_ENTRIES, + domainInfo.parts[numParts - 2].count + + domainInfo.parts[numParts - 1].count)); + // Erase everything before last domainpart file + eraseSerial = domainInfo.parts[numParts - 1].range.from(); + s1->erase(eraseSerial); + TEST_DO(assertVisitStats(tls, "erase", 2, TOTAL_NUM_ENTRIES, + eraseSerial, TOTAL_NUM_ENTRIES, + TOTAL_NUM_ENTRIES + 1 - eraseSerial, + TOTAL_NUM_ENTRIES - eraseSerial)); + TEST_DO(assertStatus(*s1, eraseSerial, TOTAL_NUM_ENTRIES, + domainInfo.parts[numParts - 1].count)); + // No apparent effect of erasing just first entry in last part + s1->erase(eraseSerial + 1); + TEST_DO(assertVisitStats(tls, "erase", 2, TOTAL_NUM_ENTRIES, + eraseSerial, TOTAL_NUM_ENTRIES, + TOTAL_NUM_ENTRIES + 1 - eraseSerial, + TOTAL_NUM_ENTRIES - eraseSerial)); + TEST_DO(assertStatus(*s1, eraseSerial + 1, TOTAL_NUM_ENTRIES, + domainInfo.parts[numParts - 1].count)); + // No apparent effect of erasing almost all of last part + eraseSerial2 = domainInfo.parts[numParts - 1].range.to(); + s1->erase(eraseSerial2); + TEST_DO(assertVisitStats(tls, "erase", 2, TOTAL_NUM_ENTRIES, + eraseSerial, TOTAL_NUM_ENTRIES, + TOTAL_NUM_ENTRIES + 1 - eraseSerial, + TOTAL_NUM_ENTRIES - eraseSerial)); + TEST_DO(assertStatus(*s1, eraseSerial2, TOTAL_NUM_ENTRIES, + domainInfo.parts[numParts - 1].count)); + } +} + + +void +Test::testSync() +{ + const unsigned int NUM_PACKETS = 3; + const unsigned int NUM_ENTRIES = 4; + const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; + + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test9", 18377, ".", fileHeaderContext, 0x1000000); + TransLogClient tls("tcp/localhost:18377"); + + createDomainTest(tls, "sync", 0); + TransLogClient::Session::UP s1 = openDomainTest(tls, "sync"); + fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES); + + SerialNum syncedTo(0); + + EXPECT_TRUE(s1->sync(2, syncedTo)); + EXPECT_EQUAL(syncedTo, TOTAL_NUM_ENTRIES); +} + + +void +Test::testTruncateOnVersionMismatch() +{ + const unsigned int NUM_PACKETS = 3; + const unsigned int NUM_ENTRIES = 4; + const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; + + uint64_t fromOld(0), toOld(0); + size_t countOld(0); + DummyFileHeaderContext fileHeaderContext; + { + TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000); + TransLogClient tls("tcp/localhost:18377"); + + createDomainTest(tls, "sync", 0); + TransLogClient::Session::UP s1 = openDomainTest(tls, "sync"); + fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES); + EXPECT_TRUE(s1->status(fromOld, toOld, countOld)); + SerialNum syncedTo(0); + + EXPECT_TRUE(s1->sync(2, syncedTo)); + EXPECT_EQUAL(syncedTo, TOTAL_NUM_ENTRIES); + } + FastOS_File f("test11/sync/sync-0000000000000000"); + EXPECT_TRUE(f.OpenWriteOnlyExisting()); + EXPECT_TRUE(f.SetPosition(f.GetSize())); + + char tmp[100]; + memset(tmp, 0, sizeof(tmp)); + EXPECT_EQUAL(static_cast<ssize_t>(sizeof(tmp)), f.Write2(tmp, sizeof(tmp))); + EXPECT_TRUE(f.Close()); + { + TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000); + TransLogClient tls("tcp/localhost:18377"); + TransLogClient::Session::UP s1 = openDomainTest(tls, "sync"); + uint64_t from(0), to(0); + size_t count(0); + EXPECT_TRUE(s1->status(from, to, count)); + ASSERT_EQUAL(fromOld, from); + ASSERT_EQUAL(toOld, to); + ASSERT_EQUAL(countOld, count); + } +} + +void +Test::testTruncateOnShortRead() +{ + const unsigned int NUM_PACKETS = 17; + const unsigned int NUM_ENTRIES = 1; + const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; + const unsigned int ENTRYSIZE = 4080; + vespalib::string topdir("test10"); + vespalib::string domain("truncate"); + vespalib::string dir(topdir + "/" + domain); + vespalib::string tlsspec("tcp/localhost:18377"); + + DummyFileHeaderContext fileHeaderContext; + { + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); + TransLogClient tls(tlsspec); + + createDomainTest(tls, domain, 0); + TransLogClient::Session::UP s1 = openDomainTest(tls, domain); + fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES, ENTRYSIZE); + + SerialNum syncedTo(0); + + EXPECT_TRUE(s1->sync(TOTAL_NUM_ENTRIES, syncedTo)); + EXPECT_EQUAL(syncedTo, TOTAL_NUM_ENTRIES); + } + { + EXPECT_EQUAL(2u, countFiles(dir)); + } + { + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); + TransLogClient tls(tlsspec); + TransLogClient::Session::UP s1 = openDomainTest(tls, domain); + checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES); + } + { + EXPECT_EQUAL(2u, countFiles(dir)); + } + { + vespalib::string filename(dir + "/truncate-0000000000000017"); + FastOS_File trfile(filename.c_str()); + EXPECT_TRUE(trfile.OpenReadWrite(NULL)); + trfile.SetSize(trfile.getSize() - 1); + trfile.Close(); + } + { + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); + TransLogClient tls(tlsspec); + TransLogClient::Session::UP s1 = openDomainTest(tls, domain); + checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES - 1); + } + { + EXPECT_EQUAL(2u, countFiles(dir)); + } +} + + +int Test::Main() +{ + TEST_INIT("translogclient_test"); + + if (_argc > 0) { + DummyFileHeaderContext::setCreator(_argv[0]); + } + test1(); + test2(); + testMany(); + testErase(); + partialUpdateTest(); + + testRemove(); + + testSync(); + + testTruncateOnShortRead(); + testTruncateOnVersionMismatch(); + + testCrcVersions(); + + TEST_DONE(); +} |