aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib/src/tests/transactionlogstress/translogstress.cpp
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-10-11 15:58:53 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-10-11 19:38:04 +0200
commit1aeded94c43e98b1b82c4a71530ea46ab2389bb2 (patch)
treec8cdfa5a401bbbd75133b44de95782cfb138d741 /searchlib/src/tests/transactionlogstress/translogstress.cpp
parente3dea9daba5b89fab618921a94f9394f8ae9144c (diff)
Remove subscribe to TLS.
Diffstat (limited to 'searchlib/src/tests/transactionlogstress/translogstress.cpp')
-rw-r--r--searchlib/src/tests/transactionlogstress/translogstress.cpp137
1 files changed, 7 insertions, 130 deletions
diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp
index aabfdb62a96..abba84b75b6 100644
--- a/searchlib/src/tests/transactionlogstress/translogstress.cpp
+++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp
@@ -23,11 +23,9 @@ using vespalib::make_string;
using vespalib::ConstBufferRef;
using search::index::DummyFileHeaderContext;
-namespace search {
-namespace transactionlog {
+namespace search::transactionlog {
using ClientSession = TransLogClient::Session;
-using Subscriber = TransLogClient::Subscriber;
using Visitor = TransLogClient::Visitor;
//-----------------------------------------------------------------------------
@@ -308,89 +306,12 @@ public:
_generator(generator), _name(name), _id(id), _validate(validate) {}
virtual ~Agent() {}
virtual RPC::Result receive(const Packet & packet) override = 0;
- virtual void inSync() override {}
virtual void eof() override {}
virtual void failed() {}
};
//-----------------------------------------------------------------------------
-// SubscriberAgent
-//-----------------------------------------------------------------------------
-class SubscriberAgent : public Agent
-{
-private:
- std::unique_ptr<Subscriber> _subscriber;
- SerialNum _from;
- SerialNum _next;
- Monitor _monitor;
-
- SerialNum getNext() {
- MonitorGuard guard(_monitor);
- return _next++;
- }
-
-public:
- SubscriberAgent(const std::string & tlsSpec, const std::string & domain,
- const EntryGenerator & generator, SerialNum from, uint32_t id, bool validate) :
- Agent(tlsSpec, domain, generator, "SubscriberAgent", id, validate),
- _subscriber(), _from(from), _next(from + 1) {}
- virtual ~SubscriberAgent() {}
- void start();
- void stop();
- SerialNum getExpectedNext() const {
- MonitorGuard guard(_monitor);
- return _next;
- }
- SerialNumRange getRange() const { return SerialNumRange(_from, _next - 1); }
- virtual RPC::Result receive(const Packet & packet) override;
-};
-
-void
-SubscriberAgent::start()
-{
- _subscriber = _client.createSubscriber(_domain, *this);
- if (_subscriber.get() == NULL) {
- throw std::runtime_error(vespalib::make_string
- ("SubscriberAgent[%u]: Could not open subscriber to %s", _id, _tlsSpec.c_str()));
- }
- if (!_subscriber->subscribe(_from)) {
- throw std::runtime_error(vespalib::make_string
- ("SubscriberAgent[%u]: Could not subscribe to %s from serialnumber %" PRIu64,
- _id, _tlsSpec.c_str(), _from));
- }
-}
-
-void
-SubscriberAgent::stop()
-{
- _subscriber.reset();
-}
-
-RPC::Result
-SubscriberAgent::receive(const Packet & packet)
-{
- auto handle = packet.getHandle();
- while (handle.size() > 0) {
- Packet::Entry entry;
- entry.deserialize(handle);
- Packet::Entry expected = _generator.getRandomEntry(getNext());
- if (_validate) {
- if (!EntryComparator::cmp(entry, expected)) {
- throw std::runtime_error(vespalib::make_string
- ("SubscriberAgent[%u]: Got %s, expected %s", _id,
- EntryPrinter::toStr(entry).c_str(),
- EntryPrinter::toStr(expected).c_str()));
- }
- }
- }
- LOG(info, "SubscriberAgent[%u]: received %s", _id, PacketPrinter::toStr(packet).c_str());
-
- return RPC::OK;
-}
-
-
-//-----------------------------------------------------------------------------
// VisitorAgent
//-----------------------------------------------------------------------------
class VisitorAgent : public Agent
@@ -534,7 +455,6 @@ private:
TransLogClient _client;
std::unique_ptr<ClientSession> _session;
EntryGenerator _generator;
- std::vector<std::shared_ptr<SubscriberAgent> > _subscribers;
std::vector<std::shared_ptr<VisitorAgent> > _visitors;
std::vector<std::shared_ptr<VisitorAgent> > _rndVisitors;
uint64_t _visitorInterval; // in milliseconds
@@ -548,29 +468,22 @@ private:
void makeRandomVisitorVector();
public:
- ControllerThread(const std::string & tlsSpec, const std::string & domain,
- const EntryGenerator & generator, uint32_t numSubscribers, uint32_t numVisitors,
- uint64_t visitorInterval, uint64_t pruneInterval);
+ ControllerThread(const std::string & tlsSpec, const std::string & domain, const EntryGenerator & generator,
+ uint32_t numVisitors, uint64_t visitorInterval, uint64_t pruneInterval);
~ControllerThread();
- void startSubscribers();
uint32_t runningVisitors();
- std::vector<std::shared_ptr<SubscriberAgent> > & getSubscribers() { return _subscribers; }
std::vector<std::shared_ptr<VisitorAgent> > & getVisitors() { return _visitors; }
virtual void doRun() override;
};
ControllerThread::ControllerThread(const std::string & tlsSpec, const std::string & domain,
- const EntryGenerator & generator, uint32_t numSubscribers, uint32_t numVisitors,
+ const EntryGenerator & generator, uint32_t numVisitors,
uint64_t visitorInterval, uint64_t pruneInterval)
: _tlsSpec(tlsSpec), _domain(domain), _client(tlsSpec.c_str()), _session(),
- _generator(generator), _subscribers(), _visitors(), _rndVisitors(), _visitorInterval(visitorInterval),
+ _generator(generator), _visitors(), _rndVisitors(), _visitorInterval(visitorInterval),
_pruneInterval(pruneInterval), _pruneTimer(), _begin(0), _end(0), _count(0)
{
- for (uint32_t i = 0; i < numSubscribers; ++i) {
- _subscribers.push_back(std::make_shared<SubscriberAgent>(tlsSpec, domain, generator, 0, i, true));
- }
-
for (uint32_t i = 0; i < numVisitors; ++i) {
_visitors.push_back(std::make_shared<VisitorAgent>(tlsSpec, domain, generator, i, true));
}
@@ -598,14 +511,6 @@ ControllerThread::makeRandomVisitorVector()
}
void
-ControllerThread::startSubscribers()
-{
- for (size_t i = 0; i < _subscribers.size(); ++i) {
- _subscribers[i]->start();
- }
-}
-
-void
ControllerThread::doRun()
{
_session = _client.open(_domain);
@@ -641,12 +546,6 @@ ControllerThread::doRun()
safePrune = _visitors[i]->getFrom();
}
}
- for (size_t i = 0; i < _subscribers.size(); ++i) {
- SerialNum next = _subscribers[i]->getExpectedNext();
- if (next < safePrune) {
- safePrune = next;
- }
- }
LOG(info, "ControllerThread: status: begin(%" PRIu64 "), end(%" PRIu64 "), count(%zu)", _begin, _end, _count);
LOG(info, "ControllerThread: prune [%" PRIu64 ", %" PRIu64 ">", _begin, safePrune);
if (!_session->erase(safePrune)) {
@@ -672,7 +571,6 @@ private:
uint64_t stressTime;
uint32_t feedRate;
- uint32_t numSubscribers;
uint32_t numVisitors;
uint64_t visitorInterval;
uint64_t pruneInterval;
@@ -683,7 +581,7 @@ private:
long baseSeed;
Config() :
- domainPartSize(0), packetSize(0), stressTime(0), feedRate(0), numSubscribers(0),
+ domainPartSize(0), packetSize(0), stressTime(0), feedRate(0),
numVisitors(0), visitorInterval(0), pruneInterval(0), minStrLen(0), maxStrLen(0), baseSeed(0) {}
};
@@ -702,7 +600,6 @@ TransLogStress::printConfig()
std::cout << "######## Config ########" << std::endl;
std::cout << "stressTime: " << _cfg.stressTime / 1000 << " s" << std::endl;
std::cout << "feedRate: " << _cfg.feedRate << " per/sec" << std::endl;
- std::cout << "numSubscribers: " << _cfg.numSubscribers << std::endl;
std::cout << "numVisitors: " << _cfg.numVisitors << std::endl;
std::cout << "visitorInterval: " << _cfg.visitorInterval << " ms" << std::endl;
std::cout << "pruneInterval: " << _cfg.pruneInterval / 1000 << " s" << std::endl;
@@ -733,7 +630,6 @@ TransLogStress::Main()
_cfg.stressTime = 1000 * 60;
_cfg.feedRate = 10000;
- _cfg.numSubscribers = 1;
_cfg.numVisitors = 1;
_cfg.visitorInterval = 1000 * 1;
_cfg.pruneInterval = 1000 * 12;
@@ -763,9 +659,6 @@ TransLogStress::Main()
case 'f':
_cfg.feedRate = atoi(arg);
break;
- case 's':
- _cfg.numSubscribers = atoi(arg);
- break;
case 'v':
_cfg.numVisitors = atoi(arg);
break;
@@ -830,13 +723,9 @@ TransLogStress::Main()
FastOS_Thread::Sleep(sleepTime);
- ControllerThread controller(tlsSpec, domain, generator, _cfg.numSubscribers, _cfg.numVisitors,
- _cfg.visitorInterval, _cfg.pruneInterval);
+ ControllerThread controller(tlsSpec, domain, generator, _cfg.numVisitors, _cfg.visitorInterval, _cfg.pruneInterval);
threadPool.NewThread(&controller);
- // start subscribers
- controller.startSubscribers();
-
// stop feeder and controller
FastOS_Thread::Sleep(_cfg.stressTime);
printConfig();
@@ -862,24 +751,12 @@ TransLogStress::Main()
std::cout << "</visitor>" << std::endl;
}
- // stop subscribers
- LOG(info, "Stop subscribers...");
- std::vector<std::shared_ptr<SubscriberAgent> > & subscribers = controller.getSubscribers();
- for (size_t i = 0; i < subscribers.size(); ++i) {
- subscribers[i]->stop();
- std::cout << "<subscriber id='" << i << "'>" << std::endl;
- std::cout << " <from>" << subscribers[i]->getRange().from() << "</from>" << std::endl;
- std::cout << " <to>" << subscribers[i]->getRange().to() << "</to>" << std::endl;
- std::cout << "</subscriber>" << std::endl;
- }
-
threadPool.Close();
return 0;
}
}
-}
int main(int argc, char ** argv)
{