diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-11 15:58:53 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-11 19:38:04 +0200 |
commit | 1aeded94c43e98b1b82c4a71530ea46ab2389bb2 (patch) | |
tree | c8cdfa5a401bbbd75133b44de95782cfb138d741 /searchlib/src/tests | |
parent | e3dea9daba5b89fab618921a94f9394f8ae9144c (diff) |
Remove subscribe to TLS.
Diffstat (limited to 'searchlib/src/tests')
-rw-r--r-- | searchlib/src/tests/transactionlog/translogclient_test.cpp | 60 | ||||
-rw-r--r-- | searchlib/src/tests/transactionlogstress/translogstress.cpp | 137 |
2 files changed, 17 insertions, 180 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index efd3b70858a..9f83db9b23a 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -42,7 +42,6 @@ private: uint32_t countFiles(const vespalib::string &dir); void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries); bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name); - bool subscribeDomainTest(TransLogClient & tls, const vespalib::string & name); bool partialUpdateTest(); bool test1(); bool testRemove(); @@ -59,22 +58,20 @@ private: TEST_APPHOOK(Test); -class CallBackTest : public TransLogClient::Subscriber::Callback +class CallBackTest : public TransLogClient::Visitor::Callback { private: virtual RPC::Result receive(const Packet & packet) override; - virtual void inSync() override { _inSync = true; } virtual void eof() override { _eof = true; } typedef std::map<SerialNum, ByteBuffer> PacketMap; PacketMap _packetMap; public: - CallBackTest() : _inSync(false), _eof(false) { } + CallBackTest() : _eof(false) { } size_t size() const { return _packetMap.size(); } bool hasSerial(SerialNum n) const { return (_packetMap.find(n) != _packetMap.end()); } - void clear() { _inSync = false; _eof = false; _packetMap.clear(); } + void clear() { _eof = false; _packetMap.clear(); } const ByteBuffer & packet(SerialNum n) { return (_packetMap.find(n)->second); } - bool _inSync; bool _eof; }; @@ -91,16 +88,14 @@ RPC::Result CallBackTest::receive(const Packet & p) return RPC::OK; } -class CallBackManyTest : public TransLogClient::Subscriber::Callback +class CallBackManyTest : public TransLogClient::Visitor::Callback { private: virtual RPC::Result receive(const Packet & packet) override; - virtual void inSync() override { _inSync = true; } virtual void eof() override { _eof = true; } public: - CallBackManyTest(size_t start) : _inSync(false), _eof(false), _count(start), _value(start) { } - void clear() { _inSync = false; _eof = false; _count = 0; _value = 0; } - bool _inSync; + CallBackManyTest(size_t start) : _eof(false), _count(start), _value(start) { } + void clear() { _eof = false; _count = 0; _value = 0; } bool _eof; size_t _count; size_t _value; @@ -121,21 +116,19 @@ RPC::Result CallBackManyTest::receive(const Packet & p) return RPC::OK; } -class CallBackUpdate : public TransLogClient::Subscriber::Callback +class CallBackUpdate : public TransLogClient::Visitor::Callback { public: typedef std::map<SerialNum, Identifiable *> PacketMap; private: virtual RPC::Result receive(const Packet & packet) override; - virtual void inSync() override { _inSync = true; } virtual void eof() override { _eof = true; } PacketMap _packetMap; public: - CallBackUpdate() : _inSync(false), _eof(false) { } + CallBackUpdate() : _eof(false) { } virtual ~CallBackUpdate() { while (_packetMap.begin() != _packetMap.end()) { delete _packetMap.begin()->second; _packetMap.erase(_packetMap.begin()); } } bool hasSerial(SerialNum n) const { return (_packetMap.find(n) != _packetMap.end()); } const PacketMap & map() const { return _packetMap; } - bool _inSync; bool _eof; }; @@ -176,16 +169,14 @@ class CallBackStatsTest : public TransLogClient::Session::Callback { private: virtual RPC::Result receive(const Packet & packet) override; - virtual void inSync() override { _inSync = true; } virtual void eof() override { _eof = true; } public: - CallBackStatsTest() : _inSync(false), _eof(false), + CallBackStatsTest() : _eof(false), _count(0), _inOrder(0), _firstSerial(0), _lastSerial(0), _prevSerial(0) { } - void clear() { _inSync = false; _eof = false; _count = 0; _inOrder = 0; + void clear() { _eof = false; _count = 0; _inOrder = 0; _firstSerial = 0; _lastSerial = 0; _inOrder = 0; } - bool _inSync; bool _eof; uint64_t _count; uint64_t _inOrder; // increase when next entry is one above previous @@ -258,7 +249,6 @@ bool Test::partialUpdateTest() ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(5, 7) ); for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); } - ASSERT_TRUE( ! ca._inSync ); ASSERT_TRUE( ca._eof ); ASSERT_TRUE( ca.map().size() == 1); ASSERT_TRUE( ca.hasSerial(7) ); @@ -268,7 +258,6 @@ bool Test::partialUpdateTest() ASSERT_TRUE(visitor1.get()); ASSERT_TRUE( visitor1->visit(4, 5) ); for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); } - ASSERT_TRUE( ! ca1._inSync ); ASSERT_TRUE( ca1._eof ); ASSERT_TRUE( ca1.map().size() == 0); @@ -277,7 +266,6 @@ bool Test::partialUpdateTest() ASSERT_TRUE(visitor2.get()); ASSERT_TRUE( visitor2->visit(5, 6) ); for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); } - ASSERT_TRUE( ! ca2._inSync ); ASSERT_TRUE( ca2._eof ); ASSERT_TRUE( ca2.map().size() == 0); @@ -286,7 +274,6 @@ bool Test::partialUpdateTest() ASSERT_TRUE(visitor3.get()); ASSERT_TRUE( visitor3->visit(5, 1000) ); for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); } - ASSERT_TRUE( ! ca3._inSync ); ASSERT_TRUE( ca3._eof ); ASSERT_TRUE( ca3.map().size() == 1); ASSERT_TRUE( ca3.hasSerial(7) ); @@ -451,7 +438,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c ASSERT_TRUE(visitor.get()); EXPECT_TRUE( visitor->visit(0, 1) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } - EXPECT_TRUE( ! ca._inSync ); EXPECT_TRUE( ca._eof ); EXPECT_TRUE( ! ca.hasSerial(0) ); EXPECT_TRUE( ca.hasSerial(1) ); @@ -462,7 +448,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c ASSERT_TRUE(visitor.get()); EXPECT_TRUE( visitor->visit(1, 2) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } - EXPECT_TRUE( ! ca._inSync ); EXPECT_TRUE( ca._eof ); EXPECT_TRUE( ! ca.hasSerial(0) ); EXPECT_TRUE( ! ca.hasSerial(1) ); @@ -474,7 +459,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c EXPECT_TRUE(visitor.get()); EXPECT_TRUE( visitor->visit(0, 3) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } - EXPECT_TRUE( ! ca._inSync ); EXPECT_TRUE( ca._eof ); EXPECT_TRUE( ! ca.hasSerial(0) ); EXPECT_TRUE( ca.hasSerial(1) ); @@ -486,7 +470,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c ASSERT_TRUE(visitor.get()); EXPECT_TRUE( visitor->visit(2, 3) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } - EXPECT_TRUE( ! ca._inSync ); EXPECT_TRUE( ca._eof ); EXPECT_TRUE( ! ca.hasSerial(0) ); EXPECT_TRUE( !ca.hasSerial(1) ); @@ -497,23 +480,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c return retval; } -bool Test::subscribeDomainTest(TransLogClient & tls, const vespalib::string & name) -{ - bool retval(true); - CallBackTest ca; - TransLogClient::Subscriber::UP subscriber = tls.createSubscriber(name, ca); - ASSERT_TRUE(subscriber.get()); - ASSERT_TRUE( subscriber->subscribe(0) ); - for (size_t i(0); ! ca._inSync && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } - ASSERT_TRUE( ca._inSync ); - ASSERT_TRUE( ! ca.hasSerial(0) ); - ASSERT_TRUE( ! ca._eof ); - ASSERT_TRUE( ca.hasSerial(1) ); - ASSERT_TRUE( ca.hasSerial(2) ); - ASSERT_TRUE( ca.hasSerial(3) ); - return retval; -} - bool Test::test1() { DummyFileHeaderContext fileHeaderContext; @@ -525,7 +491,6 @@ bool Test::test1() TransLogClient::Session::UP s1 = openDomainTest(tls, name); fillDomainTest(s1.get(), name); visitDomainTest(tls, s1.get(), name); - subscribeDomainTest(tls, name); return true; } @@ -569,7 +534,6 @@ bool Test::testRemove() TransLogClient::Session::UP s1 = openDomainTest(tls, name); fillDomainTest(s1.get(), name); visitDomainTest(tls, s1.get(), name); - subscribeDomainTest(tls, name); ASSERT_TRUE(tls.remove(name)); return true; @@ -584,7 +548,6 @@ bool Test::test2() vespalib::string name("test1"); TransLogClient::Session::UP s1 = openDomainTest(tls, name); visitDomainTest(tls, s1.get(), name); - subscribeDomainTest(tls, name); return true; } @@ -603,7 +566,6 @@ assertVisitStats(TransLogClient &tls, const vespalib::string &domain, for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } - ASSERT_TRUE(!ca._inSync); ASSERT_TRUE(ca._eof); EXPECT_EQUAL(expFirstSerial, ca._firstSerial); EXPECT_EQUAL(expLastSerial, ca._lastSerial); @@ -651,7 +613,6 @@ void Test::testMany() ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } - ASSERT_TRUE( ! ca._inSync ); ASSERT_TRUE( ca._eof ); EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES); @@ -673,7 +634,6 @@ void Test::testMany() ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } - ASSERT_TRUE( ! ca._inSync ); ASSERT_TRUE( ca._eof ); EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES); diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp index aabfdb62a96..abba84b75b6 100644 --- a/searchlib/src/tests/transactionlogstress/translogstress.cpp +++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp @@ -23,11 +23,9 @@ using vespalib::make_string; using vespalib::ConstBufferRef; using search::index::DummyFileHeaderContext; -namespace search { -namespace transactionlog { +namespace search::transactionlog { using ClientSession = TransLogClient::Session; -using Subscriber = TransLogClient::Subscriber; using Visitor = TransLogClient::Visitor; //----------------------------------------------------------------------------- @@ -308,89 +306,12 @@ public: _generator(generator), _name(name), _id(id), _validate(validate) {} virtual ~Agent() {} virtual RPC::Result receive(const Packet & packet) override = 0; - virtual void inSync() override {} virtual void eof() override {} virtual void failed() {} }; //----------------------------------------------------------------------------- -// SubscriberAgent -//----------------------------------------------------------------------------- -class SubscriberAgent : public Agent -{ -private: - std::unique_ptr<Subscriber> _subscriber; - SerialNum _from; - SerialNum _next; - Monitor _monitor; - - SerialNum getNext() { - MonitorGuard guard(_monitor); - return _next++; - } - -public: - SubscriberAgent(const std::string & tlsSpec, const std::string & domain, - const EntryGenerator & generator, SerialNum from, uint32_t id, bool validate) : - Agent(tlsSpec, domain, generator, "SubscriberAgent", id, validate), - _subscriber(), _from(from), _next(from + 1) {} - virtual ~SubscriberAgent() {} - void start(); - void stop(); - SerialNum getExpectedNext() const { - MonitorGuard guard(_monitor); - return _next; - } - SerialNumRange getRange() const { return SerialNumRange(_from, _next - 1); } - virtual RPC::Result receive(const Packet & packet) override; -}; - -void -SubscriberAgent::start() -{ - _subscriber = _client.createSubscriber(_domain, *this); - if (_subscriber.get() == NULL) { - throw std::runtime_error(vespalib::make_string - ("SubscriberAgent[%u]: Could not open subscriber to %s", _id, _tlsSpec.c_str())); - } - if (!_subscriber->subscribe(_from)) { - throw std::runtime_error(vespalib::make_string - ("SubscriberAgent[%u]: Could not subscribe to %s from serialnumber %" PRIu64, - _id, _tlsSpec.c_str(), _from)); - } -} - -void -SubscriberAgent::stop() -{ - _subscriber.reset(); -} - -RPC::Result -SubscriberAgent::receive(const Packet & packet) -{ - auto handle = packet.getHandle(); - while (handle.size() > 0) { - Packet::Entry entry; - entry.deserialize(handle); - Packet::Entry expected = _generator.getRandomEntry(getNext()); - if (_validate) { - if (!EntryComparator::cmp(entry, expected)) { - throw std::runtime_error(vespalib::make_string - ("SubscriberAgent[%u]: Got %s, expected %s", _id, - EntryPrinter::toStr(entry).c_str(), - EntryPrinter::toStr(expected).c_str())); - } - } - } - LOG(info, "SubscriberAgent[%u]: received %s", _id, PacketPrinter::toStr(packet).c_str()); - - return RPC::OK; -} - - -//----------------------------------------------------------------------------- // VisitorAgent //----------------------------------------------------------------------------- class VisitorAgent : public Agent @@ -534,7 +455,6 @@ private: TransLogClient _client; std::unique_ptr<ClientSession> _session; EntryGenerator _generator; - std::vector<std::shared_ptr<SubscriberAgent> > _subscribers; std::vector<std::shared_ptr<VisitorAgent> > _visitors; std::vector<std::shared_ptr<VisitorAgent> > _rndVisitors; uint64_t _visitorInterval; // in milliseconds @@ -548,29 +468,22 @@ private: void makeRandomVisitorVector(); public: - ControllerThread(const std::string & tlsSpec, const std::string & domain, - const EntryGenerator & generator, uint32_t numSubscribers, uint32_t numVisitors, - uint64_t visitorInterval, uint64_t pruneInterval); + ControllerThread(const std::string & tlsSpec, const std::string & domain, const EntryGenerator & generator, + uint32_t numVisitors, uint64_t visitorInterval, uint64_t pruneInterval); ~ControllerThread(); - void startSubscribers(); uint32_t runningVisitors(); - std::vector<std::shared_ptr<SubscriberAgent> > & getSubscribers() { return _subscribers; } std::vector<std::shared_ptr<VisitorAgent> > & getVisitors() { return _visitors; } virtual void doRun() override; }; ControllerThread::ControllerThread(const std::string & tlsSpec, const std::string & domain, - const EntryGenerator & generator, uint32_t numSubscribers, uint32_t numVisitors, + const EntryGenerator & generator, uint32_t numVisitors, uint64_t visitorInterval, uint64_t pruneInterval) : _tlsSpec(tlsSpec), _domain(domain), _client(tlsSpec.c_str()), _session(), - _generator(generator), _subscribers(), _visitors(), _rndVisitors(), _visitorInterval(visitorInterval), + _generator(generator), _visitors(), _rndVisitors(), _visitorInterval(visitorInterval), _pruneInterval(pruneInterval), _pruneTimer(), _begin(0), _end(0), _count(0) { - for (uint32_t i = 0; i < numSubscribers; ++i) { - _subscribers.push_back(std::make_shared<SubscriberAgent>(tlsSpec, domain, generator, 0, i, true)); - } - for (uint32_t i = 0; i < numVisitors; ++i) { _visitors.push_back(std::make_shared<VisitorAgent>(tlsSpec, domain, generator, i, true)); } @@ -598,14 +511,6 @@ ControllerThread::makeRandomVisitorVector() } void -ControllerThread::startSubscribers() -{ - for (size_t i = 0; i < _subscribers.size(); ++i) { - _subscribers[i]->start(); - } -} - -void ControllerThread::doRun() { _session = _client.open(_domain); @@ -641,12 +546,6 @@ ControllerThread::doRun() safePrune = _visitors[i]->getFrom(); } } - for (size_t i = 0; i < _subscribers.size(); ++i) { - SerialNum next = _subscribers[i]->getExpectedNext(); - if (next < safePrune) { - safePrune = next; - } - } LOG(info, "ControllerThread: status: begin(%" PRIu64 "), end(%" PRIu64 "), count(%zu)", _begin, _end, _count); LOG(info, "ControllerThread: prune [%" PRIu64 ", %" PRIu64 ">", _begin, safePrune); if (!_session->erase(safePrune)) { @@ -672,7 +571,6 @@ private: uint64_t stressTime; uint32_t feedRate; - uint32_t numSubscribers; uint32_t numVisitors; uint64_t visitorInterval; uint64_t pruneInterval; @@ -683,7 +581,7 @@ private: long baseSeed; Config() : - domainPartSize(0), packetSize(0), stressTime(0), feedRate(0), numSubscribers(0), + domainPartSize(0), packetSize(0), stressTime(0), feedRate(0), numVisitors(0), visitorInterval(0), pruneInterval(0), minStrLen(0), maxStrLen(0), baseSeed(0) {} }; @@ -702,7 +600,6 @@ TransLogStress::printConfig() std::cout << "######## Config ########" << std::endl; std::cout << "stressTime: " << _cfg.stressTime / 1000 << " s" << std::endl; std::cout << "feedRate: " << _cfg.feedRate << " per/sec" << std::endl; - std::cout << "numSubscribers: " << _cfg.numSubscribers << std::endl; std::cout << "numVisitors: " << _cfg.numVisitors << std::endl; std::cout << "visitorInterval: " << _cfg.visitorInterval << " ms" << std::endl; std::cout << "pruneInterval: " << _cfg.pruneInterval / 1000 << " s" << std::endl; @@ -733,7 +630,6 @@ TransLogStress::Main() _cfg.stressTime = 1000 * 60; _cfg.feedRate = 10000; - _cfg.numSubscribers = 1; _cfg.numVisitors = 1; _cfg.visitorInterval = 1000 * 1; _cfg.pruneInterval = 1000 * 12; @@ -763,9 +659,6 @@ TransLogStress::Main() case 'f': _cfg.feedRate = atoi(arg); break; - case 's': - _cfg.numSubscribers = atoi(arg); - break; case 'v': _cfg.numVisitors = atoi(arg); break; @@ -830,13 +723,9 @@ TransLogStress::Main() FastOS_Thread::Sleep(sleepTime); - ControllerThread controller(tlsSpec, domain, generator, _cfg.numSubscribers, _cfg.numVisitors, - _cfg.visitorInterval, _cfg.pruneInterval); + ControllerThread controller(tlsSpec, domain, generator, _cfg.numVisitors, _cfg.visitorInterval, _cfg.pruneInterval); threadPool.NewThread(&controller); - // start subscribers - controller.startSubscribers(); - // stop feeder and controller FastOS_Thread::Sleep(_cfg.stressTime); printConfig(); @@ -862,24 +751,12 @@ TransLogStress::Main() std::cout << "</visitor>" << std::endl; } - // stop subscribers - LOG(info, "Stop subscribers..."); - std::vector<std::shared_ptr<SubscriberAgent> > & subscribers = controller.getSubscribers(); - for (size_t i = 0; i < subscribers.size(); ++i) { - subscribers[i]->stop(); - std::cout << "<subscriber id='" << i << "'>" << std::endl; - std::cout << " <from>" << subscribers[i]->getRange().from() << "</from>" << std::endl; - std::cout << " <to>" << subscribers[i]->getRange().to() << "</to>" << std::endl; - std::cout << "</subscriber>" << std::endl; - } - threadPool.Close(); return 0; } } -} int main(int argc, char ** argv) { |