aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib/src/tests
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-10-11 15:58:53 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-10-11 19:38:04 +0200
commit1aeded94c43e98b1b82c4a71530ea46ab2389bb2 (patch)
treec8cdfa5a401bbbd75133b44de95782cfb138d741 /searchlib/src/tests
parente3dea9daba5b89fab618921a94f9394f8ae9144c (diff)
Remove subscribe to TLS.
Diffstat (limited to 'searchlib/src/tests')
-rw-r--r--searchlib/src/tests/transactionlog/translogclient_test.cpp60
-rw-r--r--searchlib/src/tests/transactionlogstress/translogstress.cpp137
2 files changed, 17 insertions, 180 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp
index efd3b70858a..9f83db9b23a 100644
--- a/searchlib/src/tests/transactionlog/translogclient_test.cpp
+++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp
@@ -42,7 +42,6 @@ private:
uint32_t countFiles(const vespalib::string &dir);
void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries);
bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name);
- bool subscribeDomainTest(TransLogClient & tls, const vespalib::string & name);
bool partialUpdateTest();
bool test1();
bool testRemove();
@@ -59,22 +58,20 @@ private:
TEST_APPHOOK(Test);
-class CallBackTest : public TransLogClient::Subscriber::Callback
+class CallBackTest : public TransLogClient::Visitor::Callback
{
private:
virtual RPC::Result receive(const Packet & packet) override;
- virtual void inSync() override { _inSync = true; }
virtual void eof() override { _eof = true; }
typedef std::map<SerialNum, ByteBuffer> PacketMap;
PacketMap _packetMap;
public:
- CallBackTest() : _inSync(false), _eof(false) { }
+ CallBackTest() : _eof(false) { }
size_t size() const { return _packetMap.size(); }
bool hasSerial(SerialNum n) const { return (_packetMap.find(n) != _packetMap.end()); }
- void clear() { _inSync = false; _eof = false; _packetMap.clear(); }
+ void clear() { _eof = false; _packetMap.clear(); }
const ByteBuffer & packet(SerialNum n) { return (_packetMap.find(n)->second); }
- bool _inSync;
bool _eof;
};
@@ -91,16 +88,14 @@ RPC::Result CallBackTest::receive(const Packet & p)
return RPC::OK;
}
-class CallBackManyTest : public TransLogClient::Subscriber::Callback
+class CallBackManyTest : public TransLogClient::Visitor::Callback
{
private:
virtual RPC::Result receive(const Packet & packet) override;
- virtual void inSync() override { _inSync = true; }
virtual void eof() override { _eof = true; }
public:
- CallBackManyTest(size_t start) : _inSync(false), _eof(false), _count(start), _value(start) { }
- void clear() { _inSync = false; _eof = false; _count = 0; _value = 0; }
- bool _inSync;
+ CallBackManyTest(size_t start) : _eof(false), _count(start), _value(start) { }
+ void clear() { _eof = false; _count = 0; _value = 0; }
bool _eof;
size_t _count;
size_t _value;
@@ -121,21 +116,19 @@ RPC::Result CallBackManyTest::receive(const Packet & p)
return RPC::OK;
}
-class CallBackUpdate : public TransLogClient::Subscriber::Callback
+class CallBackUpdate : public TransLogClient::Visitor::Callback
{
public:
typedef std::map<SerialNum, Identifiable *> PacketMap;
private:
virtual RPC::Result receive(const Packet & packet) override;
- virtual void inSync() override { _inSync = true; }
virtual void eof() override { _eof = true; }
PacketMap _packetMap;
public:
- CallBackUpdate() : _inSync(false), _eof(false) { }
+ CallBackUpdate() : _eof(false) { }
virtual ~CallBackUpdate() { while (_packetMap.begin() != _packetMap.end()) { delete _packetMap.begin()->second; _packetMap.erase(_packetMap.begin()); } }
bool hasSerial(SerialNum n) const { return (_packetMap.find(n) != _packetMap.end()); }
const PacketMap & map() const { return _packetMap; }
- bool _inSync;
bool _eof;
};
@@ -176,16 +169,14 @@ class CallBackStatsTest : public TransLogClient::Session::Callback
{
private:
virtual RPC::Result receive(const Packet & packet) override;
- virtual void inSync() override { _inSync = true; }
virtual void eof() override { _eof = true; }
public:
- CallBackStatsTest() : _inSync(false), _eof(false),
+ CallBackStatsTest() : _eof(false),
_count(0), _inOrder(0),
_firstSerial(0), _lastSerial(0),
_prevSerial(0) { }
- void clear() { _inSync = false; _eof = false; _count = 0; _inOrder = 0;
+ void clear() { _eof = false; _count = 0; _inOrder = 0;
_firstSerial = 0; _lastSerial = 0; _inOrder = 0; }
- bool _inSync;
bool _eof;
uint64_t _count;
uint64_t _inOrder; // increase when next entry is one above previous
@@ -258,7 +249,6 @@ bool Test::partialUpdateTest()
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(5, 7) );
for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); }
- ASSERT_TRUE( ! ca._inSync );
ASSERT_TRUE( ca._eof );
ASSERT_TRUE( ca.map().size() == 1);
ASSERT_TRUE( ca.hasSerial(7) );
@@ -268,7 +258,6 @@ bool Test::partialUpdateTest()
ASSERT_TRUE(visitor1.get());
ASSERT_TRUE( visitor1->visit(4, 5) );
for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); }
- ASSERT_TRUE( ! ca1._inSync );
ASSERT_TRUE( ca1._eof );
ASSERT_TRUE( ca1.map().size() == 0);
@@ -277,7 +266,6 @@ bool Test::partialUpdateTest()
ASSERT_TRUE(visitor2.get());
ASSERT_TRUE( visitor2->visit(5, 6) );
for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); }
- ASSERT_TRUE( ! ca2._inSync );
ASSERT_TRUE( ca2._eof );
ASSERT_TRUE( ca2.map().size() == 0);
@@ -286,7 +274,6 @@ bool Test::partialUpdateTest()
ASSERT_TRUE(visitor3.get());
ASSERT_TRUE( visitor3->visit(5, 1000) );
for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); }
- ASSERT_TRUE( ! ca3._inSync );
ASSERT_TRUE( ca3._eof );
ASSERT_TRUE( ca3.map().size() == 1);
ASSERT_TRUE( ca3.hasSerial(7) );
@@ -451,7 +438,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c
ASSERT_TRUE(visitor.get());
EXPECT_TRUE( visitor->visit(0, 1) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
- EXPECT_TRUE( ! ca._inSync );
EXPECT_TRUE( ca._eof );
EXPECT_TRUE( ! ca.hasSerial(0) );
EXPECT_TRUE( ca.hasSerial(1) );
@@ -462,7 +448,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c
ASSERT_TRUE(visitor.get());
EXPECT_TRUE( visitor->visit(1, 2) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
- EXPECT_TRUE( ! ca._inSync );
EXPECT_TRUE( ca._eof );
EXPECT_TRUE( ! ca.hasSerial(0) );
EXPECT_TRUE( ! ca.hasSerial(1) );
@@ -474,7 +459,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c
EXPECT_TRUE(visitor.get());
EXPECT_TRUE( visitor->visit(0, 3) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
- EXPECT_TRUE( ! ca._inSync );
EXPECT_TRUE( ca._eof );
EXPECT_TRUE( ! ca.hasSerial(0) );
EXPECT_TRUE( ca.hasSerial(1) );
@@ -486,7 +470,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c
ASSERT_TRUE(visitor.get());
EXPECT_TRUE( visitor->visit(2, 3) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
- EXPECT_TRUE( ! ca._inSync );
EXPECT_TRUE( ca._eof );
EXPECT_TRUE( ! ca.hasSerial(0) );
EXPECT_TRUE( !ca.hasSerial(1) );
@@ -497,23 +480,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c
return retval;
}
-bool Test::subscribeDomainTest(TransLogClient & tls, const vespalib::string & name)
-{
- bool retval(true);
- CallBackTest ca;
- TransLogClient::Subscriber::UP subscriber = tls.createSubscriber(name, ca);
- ASSERT_TRUE(subscriber.get());
- ASSERT_TRUE( subscriber->subscribe(0) );
- for (size_t i(0); ! ca._inSync && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
- ASSERT_TRUE( ca._inSync );
- ASSERT_TRUE( ! ca.hasSerial(0) );
- ASSERT_TRUE( ! ca._eof );
- ASSERT_TRUE( ca.hasSerial(1) );
- ASSERT_TRUE( ca.hasSerial(2) );
- ASSERT_TRUE( ca.hasSerial(3) );
- return retval;
-}
-
bool Test::test1()
{
DummyFileHeaderContext fileHeaderContext;
@@ -525,7 +491,6 @@ bool Test::test1()
TransLogClient::Session::UP s1 = openDomainTest(tls, name);
fillDomainTest(s1.get(), name);
visitDomainTest(tls, s1.get(), name);
- subscribeDomainTest(tls, name);
return true;
}
@@ -569,7 +534,6 @@ bool Test::testRemove()
TransLogClient::Session::UP s1 = openDomainTest(tls, name);
fillDomainTest(s1.get(), name);
visitDomainTest(tls, s1.get(), name);
- subscribeDomainTest(tls, name);
ASSERT_TRUE(tls.remove(name));
return true;
@@ -584,7 +548,6 @@ bool Test::test2()
vespalib::string name("test1");
TransLogClient::Session::UP s1 = openDomainTest(tls, name);
visitDomainTest(tls, s1.get(), name);
- subscribeDomainTest(tls, name);
return true;
}
@@ -603,7 +566,6 @@ assertVisitStats(TransLogClient &tls, const vespalib::string &domain,
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) {
FastOS_Thread::Sleep(10);
}
- ASSERT_TRUE(!ca._inSync);
ASSERT_TRUE(ca._eof);
EXPECT_EQUAL(expFirstSerial, ca._firstSerial);
EXPECT_EQUAL(expLastSerial, ca._lastSerial);
@@ -651,7 +613,6 @@ void Test::testMany()
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
- ASSERT_TRUE( ! ca._inSync );
ASSERT_TRUE( ca._eof );
EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES);
@@ -673,7 +634,6 @@ void Test::testMany()
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); }
- ASSERT_TRUE( ! ca._inSync );
ASSERT_TRUE( ca._eof );
EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES);
diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp
index aabfdb62a96..abba84b75b6 100644
--- a/searchlib/src/tests/transactionlogstress/translogstress.cpp
+++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp
@@ -23,11 +23,9 @@ using vespalib::make_string;
using vespalib::ConstBufferRef;
using search::index::DummyFileHeaderContext;
-namespace search {
-namespace transactionlog {
+namespace search::transactionlog {
using ClientSession = TransLogClient::Session;
-using Subscriber = TransLogClient::Subscriber;
using Visitor = TransLogClient::Visitor;
//-----------------------------------------------------------------------------
@@ -308,89 +306,12 @@ public:
_generator(generator), _name(name), _id(id), _validate(validate) {}
virtual ~Agent() {}
virtual RPC::Result receive(const Packet & packet) override = 0;
- virtual void inSync() override {}
virtual void eof() override {}
virtual void failed() {}
};
//-----------------------------------------------------------------------------
-// SubscriberAgent
-//-----------------------------------------------------------------------------
-class SubscriberAgent : public Agent
-{
-private:
- std::unique_ptr<Subscriber> _subscriber;
- SerialNum _from;
- SerialNum _next;
- Monitor _monitor;
-
- SerialNum getNext() {
- MonitorGuard guard(_monitor);
- return _next++;
- }
-
-public:
- SubscriberAgent(const std::string & tlsSpec, const std::string & domain,
- const EntryGenerator & generator, SerialNum from, uint32_t id, bool validate) :
- Agent(tlsSpec, domain, generator, "SubscriberAgent", id, validate),
- _subscriber(), _from(from), _next(from + 1) {}
- virtual ~SubscriberAgent() {}
- void start();
- void stop();
- SerialNum getExpectedNext() const {
- MonitorGuard guard(_monitor);
- return _next;
- }
- SerialNumRange getRange() const { return SerialNumRange(_from, _next - 1); }
- virtual RPC::Result receive(const Packet & packet) override;
-};
-
-void
-SubscriberAgent::start()
-{
- _subscriber = _client.createSubscriber(_domain, *this);
- if (_subscriber.get() == NULL) {
- throw std::runtime_error(vespalib::make_string
- ("SubscriberAgent[%u]: Could not open subscriber to %s", _id, _tlsSpec.c_str()));
- }
- if (!_subscriber->subscribe(_from)) {
- throw std::runtime_error(vespalib::make_string
- ("SubscriberAgent[%u]: Could not subscribe to %s from serialnumber %" PRIu64,
- _id, _tlsSpec.c_str(), _from));
- }
-}
-
-void
-SubscriberAgent::stop()
-{
- _subscriber.reset();
-}
-
-RPC::Result
-SubscriberAgent::receive(const Packet & packet)
-{
- auto handle = packet.getHandle();
- while (handle.size() > 0) {
- Packet::Entry entry;
- entry.deserialize(handle);
- Packet::Entry expected = _generator.getRandomEntry(getNext());
- if (_validate) {
- if (!EntryComparator::cmp(entry, expected)) {
- throw std::runtime_error(vespalib::make_string
- ("SubscriberAgent[%u]: Got %s, expected %s", _id,
- EntryPrinter::toStr(entry).c_str(),
- EntryPrinter::toStr(expected).c_str()));
- }
- }
- }
- LOG(info, "SubscriberAgent[%u]: received %s", _id, PacketPrinter::toStr(packet).c_str());
-
- return RPC::OK;
-}
-
-
-//-----------------------------------------------------------------------------
// VisitorAgent
//-----------------------------------------------------------------------------
class VisitorAgent : public Agent
@@ -534,7 +455,6 @@ private:
TransLogClient _client;
std::unique_ptr<ClientSession> _session;
EntryGenerator _generator;
- std::vector<std::shared_ptr<SubscriberAgent> > _subscribers;
std::vector<std::shared_ptr<VisitorAgent> > _visitors;
std::vector<std::shared_ptr<VisitorAgent> > _rndVisitors;
uint64_t _visitorInterval; // in milliseconds
@@ -548,29 +468,22 @@ private:
void makeRandomVisitorVector();
public:
- ControllerThread(const std::string & tlsSpec, const std::string & domain,
- const EntryGenerator & generator, uint32_t numSubscribers, uint32_t numVisitors,
- uint64_t visitorInterval, uint64_t pruneInterval);
+ ControllerThread(const std::string & tlsSpec, const std::string & domain, const EntryGenerator & generator,
+ uint32_t numVisitors, uint64_t visitorInterval, uint64_t pruneInterval);
~ControllerThread();
- void startSubscribers();
uint32_t runningVisitors();
- std::vector<std::shared_ptr<SubscriberAgent> > & getSubscribers() { return _subscribers; }
std::vector<std::shared_ptr<VisitorAgent> > & getVisitors() { return _visitors; }
virtual void doRun() override;
};
ControllerThread::ControllerThread(const std::string & tlsSpec, const std::string & domain,
- const EntryGenerator & generator, uint32_t numSubscribers, uint32_t numVisitors,
+ const EntryGenerator & generator, uint32_t numVisitors,
uint64_t visitorInterval, uint64_t pruneInterval)
: _tlsSpec(tlsSpec), _domain(domain), _client(tlsSpec.c_str()), _session(),
- _generator(generator), _subscribers(), _visitors(), _rndVisitors(), _visitorInterval(visitorInterval),
+ _generator(generator), _visitors(), _rndVisitors(), _visitorInterval(visitorInterval),
_pruneInterval(pruneInterval), _pruneTimer(), _begin(0), _end(0), _count(0)
{
- for (uint32_t i = 0; i < numSubscribers; ++i) {
- _subscribers.push_back(std::make_shared<SubscriberAgent>(tlsSpec, domain, generator, 0, i, true));
- }
-
for (uint32_t i = 0; i < numVisitors; ++i) {
_visitors.push_back(std::make_shared<VisitorAgent>(tlsSpec, domain, generator, i, true));
}
@@ -598,14 +511,6 @@ ControllerThread::makeRandomVisitorVector()
}
void
-ControllerThread::startSubscribers()
-{
- for (size_t i = 0; i < _subscribers.size(); ++i) {
- _subscribers[i]->start();
- }
-}
-
-void
ControllerThread::doRun()
{
_session = _client.open(_domain);
@@ -641,12 +546,6 @@ ControllerThread::doRun()
safePrune = _visitors[i]->getFrom();
}
}
- for (size_t i = 0; i < _subscribers.size(); ++i) {
- SerialNum next = _subscribers[i]->getExpectedNext();
- if (next < safePrune) {
- safePrune = next;
- }
- }
LOG(info, "ControllerThread: status: begin(%" PRIu64 "), end(%" PRIu64 "), count(%zu)", _begin, _end, _count);
LOG(info, "ControllerThread: prune [%" PRIu64 ", %" PRIu64 ">", _begin, safePrune);
if (!_session->erase(safePrune)) {
@@ -672,7 +571,6 @@ private:
uint64_t stressTime;
uint32_t feedRate;
- uint32_t numSubscribers;
uint32_t numVisitors;
uint64_t visitorInterval;
uint64_t pruneInterval;
@@ -683,7 +581,7 @@ private:
long baseSeed;
Config() :
- domainPartSize(0), packetSize(0), stressTime(0), feedRate(0), numSubscribers(0),
+ domainPartSize(0), packetSize(0), stressTime(0), feedRate(0),
numVisitors(0), visitorInterval(0), pruneInterval(0), minStrLen(0), maxStrLen(0), baseSeed(0) {}
};
@@ -702,7 +600,6 @@ TransLogStress::printConfig()
std::cout << "######## Config ########" << std::endl;
std::cout << "stressTime: " << _cfg.stressTime / 1000 << " s" << std::endl;
std::cout << "feedRate: " << _cfg.feedRate << " per/sec" << std::endl;
- std::cout << "numSubscribers: " << _cfg.numSubscribers << std::endl;
std::cout << "numVisitors: " << _cfg.numVisitors << std::endl;
std::cout << "visitorInterval: " << _cfg.visitorInterval << " ms" << std::endl;
std::cout << "pruneInterval: " << _cfg.pruneInterval / 1000 << " s" << std::endl;
@@ -733,7 +630,6 @@ TransLogStress::Main()
_cfg.stressTime = 1000 * 60;
_cfg.feedRate = 10000;
- _cfg.numSubscribers = 1;
_cfg.numVisitors = 1;
_cfg.visitorInterval = 1000 * 1;
_cfg.pruneInterval = 1000 * 12;
@@ -763,9 +659,6 @@ TransLogStress::Main()
case 'f':
_cfg.feedRate = atoi(arg);
break;
- case 's':
- _cfg.numSubscribers = atoi(arg);
- break;
case 'v':
_cfg.numVisitors = atoi(arg);
break;
@@ -830,13 +723,9 @@ TransLogStress::Main()
FastOS_Thread::Sleep(sleepTime);
- ControllerThread controller(tlsSpec, domain, generator, _cfg.numSubscribers, _cfg.numVisitors,
- _cfg.visitorInterval, _cfg.pruneInterval);
+ ControllerThread controller(tlsSpec, domain, generator, _cfg.numVisitors, _cfg.visitorInterval, _cfg.pruneInterval);
threadPool.NewThread(&controller);
- // start subscribers
- controller.startSubscribers();
-
// stop feeder and controller
FastOS_Thread::Sleep(_cfg.stressTime);
printConfig();
@@ -862,24 +751,12 @@ TransLogStress::Main()
std::cout << "</visitor>" << std::endl;
}
- // stop subscribers
- LOG(info, "Stop subscribers...");
- std::vector<std::shared_ptr<SubscriberAgent> > & subscribers = controller.getSubscribers();
- for (size_t i = 0; i < subscribers.size(); ++i) {
- subscribers[i]->stop();
- std::cout << "<subscriber id='" << i << "'>" << std::endl;
- std::cout << " <from>" << subscribers[i]->getRange().from() << "</from>" << std::endl;
- std::cout << " <to>" << subscribers[i]->getRange().to() << "</to>" << std::endl;
- std::cout << "</subscriber>" << std::endl;
- }
-
threadPool.Close();
return 0;
}
}
-}
int main(int argc, char ** argv)
{