From 72231250ed81e10d66bfe70701e64fa5fe50f712 Mon Sep 17 00:00:00 2001 From: Jon Bratseth Date: Wed, 15 Jun 2016 23:09:44 +0200 Subject: Publish --- .../tests/transactionlogstress/translogstress.cpp | 875 +++++++++++++++++++++ 1 file changed, 875 insertions(+) create mode 100644 searchlib/src/tests/transactionlogstress/translogstress.cpp (limited to 'searchlib/src/tests/transactionlogstress/translogstress.cpp') diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp new file mode 100644 index 00000000000..1c51c81e633 --- /dev/null +++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp @@ -0,0 +1,875 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +LOG_SETUP("translogstress"); + +using document::ByteBuffer; +using search::Runnable; +using vespalib::Monitor; +using vespalib::MonitorGuard; +using std::shared_ptr; +using vespalib::make_string; +using vespalib::ConstBufferRef; +using search::index::DummyFileHeaderContext; + +namespace search { +namespace transactionlog { + +using ClientSession = TransLogClient::Session; +using Subscriber = TransLogClient::Subscriber; +using Visitor = TransLogClient::Visitor; + +//----------------------------------------------------------------------------- +// BufferGenerator +//----------------------------------------------------------------------------- +class BufferGenerator +{ +private: + Rand48 _rnd; + uint32_t _minStrLen; + uint32_t _maxStrLen; + +public: + BufferGenerator() : + _rnd(), _minStrLen(0), _maxStrLen(0) {} + BufferGenerator(uint32_t minStrLen, uint32_t maxStrLen) : + _rnd(), _minStrLen(minStrLen), _maxStrLen(maxStrLen) {} + void setSeed(long seed) { _rnd.srand48(seed); } + ByteBuffer getRandomBuffer(); +}; + +ByteBuffer +BufferGenerator::getRandomBuffer() +{ + size_t len = _minStrLen + _rnd.lrand48() % (_maxStrLen - _minStrLen); + std::string str; + for (size_t i = 0; i < len; ++i) { + char c = 'a' + _rnd.lrand48() % ('z' - 'a' + 1); + str.push_back(c); + } + ByteBuffer buf(str.size() + 1); + buf.putBytes(str.c_str(), str.size() + 1); + buf.flip(); + return buf; +} + + +//----------------------------------------------------------------------------- +// EntryGenerator +//----------------------------------------------------------------------------- +class EntryGenerator +{ +private: + Rand48 _rnd; + long _baseSeed; + BufferGenerator _bufferGenerator; + const std::vector * _buffers; + ByteBuffer _lastGeneratedBuffer; + +public: + EntryGenerator(long baseSeed, const BufferGenerator & bufferGenerator) : + _rnd(), _baseSeed(baseSeed), _bufferGenerator(bufferGenerator), _buffers(NULL), + _lastGeneratedBuffer() {} + EntryGenerator(const EntryGenerator & rhs) : + _rnd(), _baseSeed(rhs._baseSeed), _bufferGenerator(rhs._bufferGenerator), + _buffers(rhs._buffers), _lastGeneratedBuffer(rhs._lastGeneratedBuffer) {} + EntryGenerator & operator=(const EntryGenerator & rhs) { + _rnd = rhs._rnd; + _baseSeed = rhs._baseSeed; + _bufferGenerator = rhs._bufferGenerator; + _buffers = rhs._buffers; + return *this; + }; + SerialNum getRandomSerialNum(SerialNum begin, SerialNum end); + Packet::Entry getRandomEntry(SerialNum num); + Rand48 & getRnd() { return _rnd; } + void setBuffers(const std::vector & buffers) { + _buffers = &buffers; + } +}; + +SerialNum +EntryGenerator::getRandomSerialNum(SerialNum begin, SerialNum end) +{ + // return random number in range [begin, end] + assert(begin <= end); + if (begin == end) { + return SerialNum(begin); + } else { + return SerialNum(begin + _rnd.lrand48() % (end - begin + 1)); + } +} + +Packet::Entry +EntryGenerator::getRandomEntry(SerialNum num) +{ + _rnd.srand48(_baseSeed + num); + if (_buffers != NULL) { + size_t i = _rnd.lrand48() % _buffers->size(); + const ByteBuffer& buffer = (*_buffers)[i]; + return Packet::Entry(num, 1024, ConstBufferRef(buffer.getBuffer(), buffer.getLength())); + } else { + _bufferGenerator.setSeed(_baseSeed + num); + _lastGeneratedBuffer = _bufferGenerator.getRandomBuffer(); + return Packet::Entry(num, 1024, ConstBufferRef(_lastGeneratedBuffer.getBuffer(), _lastGeneratedBuffer.getLength())); + } +} + + +//----------------------------------------------------------------------------- +// EntryComparator +//----------------------------------------------------------------------------- +class EntryComparator +{ +public: + static bool cmp(const Packet::Entry & lhs, const Packet::Entry & rhs) { + if (lhs.serial() != rhs.serial()) { + return false; + } + if (lhs.type() != rhs.type()) { + return false; + } + if (lhs.data().size() != rhs.data().size()) { + return false; + } + if (memcmp(lhs.data().c_str(), rhs.data().c_str(), lhs.data().size()) != 0) { + return false; + } + return true; + } +}; + + +//----------------------------------------------------------------------------- +// EntryPrinter +//----------------------------------------------------------------------------- +class EntryPrinter +{ +public: + static std::string toStr(const Packet::Entry & e) { + std::stringstream ss; + ss << "Entry(serial(" << e.serial() << "), type(" << e.type() << "), bufferSize(" << + e.data().size() << "), buffer("; + for (size_t i = 0; i < e.data().size() - 1; ++i) { + ss << e.data().c_str()[i]; + } + ss << ")"; + return ss.str(); + } +}; + + +//----------------------------------------------------------------------------- +// PacketPrinter +//----------------------------------------------------------------------------- +class PacketPrinter +{ +public: + static std::string toStr(const Packet & p) { + std::stringstream ss; + ss << "Packet(entries(" << p.size() << "), range([" << p.range().from() << ", " << p.range().to() + << "]), bytes(" << p.getHandle().size() << "))"; + return ss.str(); + } +}; + + +//----------------------------------------------------------------------------- +// FeederThread +//----------------------------------------------------------------------------- +class FeederThread : public Runnable +{ +private: + std::string _tlsSpec; + std::string _domain; + TransLogClient _client; + std::unique_ptr _session; + EntryGenerator _generator; + uint32_t _feedRate; + Packet _packet; + SerialNum _current; + SerialNum _lastCommited; + FastOS_Time _timer; + + void commitPacket(); + bool addEntry(const Packet::Entry & e); + +public: + FeederThread(const std::string & tlsSpec, const std::string & domain, + const EntryGenerator & generator, uint32_t feedRate, size_t packetSize) : + _tlsSpec(tlsSpec), _domain(domain), _client(tlsSpec), _session(), + _generator(generator), _feedRate(feedRate), _packet(packetSize), _current(1), _lastCommited(1), _timer() {} + virtual void doRun(); + SerialNumRange getRange() const { return SerialNumRange(1, _lastCommited); } +}; + +void +FeederThread::commitPacket() +{ + _packet.close(); + const vespalib::nbostream& stream = _packet.getHandle(); + if (!_session->commit(ConstBufferRef(stream.c_str(), stream.size()))) { + throw std::runtime_error(vespalib::make_string + ("FeederThread: Failed commiting %s", PacketPrinter::toStr(_packet).c_str())); + } else { + LOG(info, "FeederThread: commited %s", PacketPrinter::toStr(_packet).c_str()); + } + _packet.clear(); + _lastCommited = _current - 1; +} + +bool +FeederThread::addEntry(const Packet::Entry & e) +{ + //LOG(info, "FeederThread: add %s", EntryPrinter::toStr(e).c_str()); + return _packet.add(e); +} + +void +FeederThread::doRun() +{ + _session = _client.open(_domain); + if (_session.get() == NULL) { + throw std::runtime_error(vespalib::make_string("FeederThread: Could not open session to %s", _tlsSpec.c_str())); + } + + while (!_done) { + if (_feedRate != 0) { + _timer.SetNow(); + for (uint32_t i = 0; i < _feedRate; ++i) { + Packet::Entry entry = _generator.getRandomEntry(_current++); + if (!addEntry(entry)) { + commitPacket(); + if (!addEntry(entry)) { + throw std::runtime_error(vespalib::make_string + ("FeederThread: Could not add %s", EntryPrinter::toStr(entry).c_str())); + } + } + } + commitPacket(); + + uint64_t milliSecsUsed = static_cast(_timer.MilliSecsToNow()); + if (milliSecsUsed < 1000) { + //LOG(info, "FeederThread: sleep %u ms", 1000 - milliSecsUsed); + FastOS_Thread::Sleep(1000 - milliSecsUsed); + } else { + LOG(info, "FeederThread: max throughput"); + } + } else { + Packet::Entry entry = _generator.getRandomEntry(_current++); + if (!addEntry(entry)) { + commitPacket(); + if (!addEntry(entry)) { + throw std::runtime_error(vespalib::make_string + ("FeederThread: Could not add %s", EntryPrinter::toStr(entry).c_str())); + } + } + } + } +} + + +//----------------------------------------------------------------------------- +// Agent +//----------------------------------------------------------------------------- +class Agent : public ClientSession::Callback +{ +protected: + std::string _tlsSpec; + std::string _domain; + TransLogClient _client; + EntryGenerator _generator; + std::string _name; + uint32_t _id; + bool _validate; + +public: + Agent(const std::string & tlsSpec, const std::string & domain, + const EntryGenerator & generator, const std::string & name, uint32_t id, bool validate) : + ClientSession::Callback(), + _tlsSpec(tlsSpec), _domain(domain), _client(tlsSpec), + _generator(generator), _name(name), _id(id), _validate(validate) {} + virtual ~Agent() {} + virtual RPC::Result receive(const Packet & packet) = 0; + virtual void inSync() {} + virtual void eof() {} + virtual void failed() {} +}; + + +//----------------------------------------------------------------------------- +// SubscriberAgent +//----------------------------------------------------------------------------- +class SubscriberAgent : public Agent +{ +private: + std::unique_ptr _subscriber; + SerialNum _from; + SerialNum _next; + Monitor _monitor; + + SerialNum getNext() { + MonitorGuard guard(_monitor); + return _next++; + } + +public: + SubscriberAgent(const std::string & tlsSpec, const std::string & domain, + const EntryGenerator & generator, SerialNum from, uint32_t id, bool validate) : + Agent(tlsSpec, domain, generator, "SubscriberAgent", id, validate), + _subscriber(), _from(from), _next(from + 1) {} + virtual ~SubscriberAgent() {} + void start(); + void stop(); + SerialNum getExpectedNext() const { + MonitorGuard guard(_monitor); + return _next; + } + SerialNumRange getRange() const { return SerialNumRange(_from, _next - 1); } + virtual RPC::Result receive(const Packet & packet); +}; + +void +SubscriberAgent::start() +{ + _subscriber = _client.createSubscriber(_domain, *this); + if (_subscriber.get() == NULL) { + throw std::runtime_error(vespalib::make_string + ("SubscriberAgent[%u]: Could not open subscriber to %s", _id, _tlsSpec.c_str())); + } + if (!_subscriber->subscribe(_from)) { + throw std::runtime_error(vespalib::make_string + ("SubscriberAgent[%u]: Could not subscribe to %s from serialnumber %" PRIu64, + _id, _tlsSpec.c_str(), _from)); + } +} + +void +SubscriberAgent::stop() +{ + _subscriber.reset(); +} + +RPC::Result +SubscriberAgent::receive(const Packet & packet) +{ + auto handle = packet.getHandle(); + while (handle.size() > 0) { + Packet::Entry entry; + entry.deserialize(handle); + Packet::Entry expected = _generator.getRandomEntry(getNext()); + if (_validate) { + if (!EntryComparator::cmp(entry, expected)) { + throw std::runtime_error(vespalib::make_string + ("SubscriberAgent[%u]: Got %s, expected %s", _id, + EntryPrinter::toStr(entry).c_str(), + EntryPrinter::toStr(expected).c_str())); + } + } + } + LOG(info, "SubscriberAgent[%u]: received %s", _id, PacketPrinter::toStr(packet).c_str()); + + return RPC::OK; +} + + +//----------------------------------------------------------------------------- +// VisitorAgent +//----------------------------------------------------------------------------- +class VisitorAgent : public Agent +{ +private: + enum State { + IDLE, RUNNING, FINISHED + }; + std::unique_ptr _visitor; + SerialNum _from; + SerialNum _to; + SerialNum _next; + bool _running; + State _state; + Monitor _monitor; + + void setState(State newState) { + MonitorGuard guard(_monitor); + //LOG(info, "VisitorAgent[%u]: setState(%s)", _id, newState == IDLE ? "idle" : + // (newState == RUNNING ? "running" : "finished")); + _state = newState; + } + SerialNum getNext(); + +public: + VisitorAgent(const std::string & tlsSpec, const std::string & domain, + const EntryGenerator & generator, uint32_t id, bool validate) : + Agent(tlsSpec, domain, generator, "VisitorAgent", id, validate), + _visitor(), _from(0), _to(0), _next(0), _state(IDLE) {} + virtual ~VisitorAgent() {} + void start(SerialNum from, SerialNum to); + void setIdle(); + bool idle() { + MonitorGuard guard(_monitor); + return _state == IDLE; + } + bool running() { + MonitorGuard guard(_monitor); + return _state == RUNNING; + } + bool finished() { + MonitorGuard guard(_monitor); + return _state == FINISHED; + } + std::string getState() { + MonitorGuard guard(_monitor); + if (_state == IDLE) { + return std::string("idle"); + } else if (_state == FINISHED) { + return std::string("finished"); + } else { + return std::string("running"); + } + } + SerialNum getFrom() { return _from; } + virtual RPC::Result receive(const Packet & packet); + virtual void eof() { + LOG(info, "VisitorAgent[%u]: eof", _id); + setState(FINISHED); + } +}; + +SerialNum +VisitorAgent::getNext() +{ + SerialNum retval = _next++; + if (retval > _to) { + throw std::runtime_error(make_string("VisitorAgent[%u]: SerialNum (%" PRIu64 ") outside " + "expected range <%" PRIu64 ", %" PRIu64 "]", _id, + retval, _from, _to)); + } + return retval; +} + +void +VisitorAgent::start(SerialNum from, SerialNum to) +{ + assert(idle()); + LOG(info, "VisitorAgent[%u]: start<%" PRIu64 ", %" PRIu64 "]", _id, from, to); + _from = from; + _to = to; + _next = from + 1; + _visitor = _client.createVisitor(_domain, *this); + if (_visitor.get() == NULL) { + throw std::runtime_error(vespalib::make_string + ("VisitorAgent[%u]: Could not open visitor to %s", _id, _tlsSpec.c_str())); + } + setState(RUNNING); + if (!_visitor->visit(_from, _to)) { + throw std::runtime_error(vespalib::make_string + ("VisitorAgent[%u]: Could not visit from %s with range <%" PRIu64 ", %" PRIu64 "]", + _id, _tlsSpec.c_str(), _from, _to)); + } +} + +void +VisitorAgent::setIdle() +{ + assert(finished()); + _visitor.reset(); + setState(IDLE); +} + +RPC::Result +VisitorAgent::receive(const Packet & packet) +{ + auto handle = packet.getHandle(); + while (handle.size() > 0) { + Packet::Entry entry; + entry.deserialize(handle); + Packet::Entry expected = _generator.getRandomEntry(getNext()); + if (_validate) { + if (!EntryComparator::cmp(entry, expected)) { + throw std::runtime_error(vespalib::make_string + ("VisitorAgent[%u]: Got %s, expected %s", _id, + EntryPrinter::toStr(entry).c_str(), + EntryPrinter::toStr(expected).c_str())); + } + } + } + + if (_next > _to + 1) { + throw std::runtime_error(vespalib::make_string + ("VisitorAgent[%u]: Visited range <%" PRIu64 ", %" PRIu64 "], expected " + "range <%" PRIu64 "u, %" PRIu64 "]", _id, + _from, _next - 1, _from, _to)); + } + + return RPC::OK; +} + + +//----------------------------------------------------------------------------- +// ControllerThread +//----------------------------------------------------------------------------- +class ControllerThread : public Runnable +{ +private: + std::string _tlsSpec; + std::string _domain; + TransLogClient _client; + std::unique_ptr _session; + EntryGenerator _generator; + std::vector > _subscribers; + std::vector > _visitors; + std::vector > _rndVisitors; + uint64_t _visitorInterval; // in milliseconds + uint64_t _pruneInterval; // in milliseconds + FastOS_Time _pruneTimer; + SerialNum _begin; + SerialNum _end; + size_t _count; + + void getStatus(); + void makeRandomVisitorVector(); + +public: + ControllerThread(const std::string & tlsSpec, const std::string & domain, + const EntryGenerator & generator, uint32_t numSubscribers, uint32_t numVisitors, + uint64_t visitorInterval, uint64_t pruneInterval) : + _tlsSpec(tlsSpec), _domain(domain), _client(tlsSpec.c_str()), _session(), + _generator(generator), _subscribers(), _visitors(), _rndVisitors(), _visitorInterval(visitorInterval), + _pruneInterval(pruneInterval), _pruneTimer(), _begin(0), _end(0), _count(0) + { + for (uint32_t i = 0; i < numSubscribers; ++i) { + _subscribers.push_back(std::shared_ptr + (new SubscriberAgent(tlsSpec, domain, generator, 0, i, true))); + } + + for (uint32_t i = 0; i < numVisitors; ++i) { + _visitors.push_back(std::shared_ptr(new VisitorAgent(tlsSpec, domain, generator, i, true))); + } + } + void startSubscribers(); + uint32_t runningVisitors(); + std::vector > & getSubscribers() { return _subscribers; } + std::vector > & getVisitors() { return _visitors; } + virtual void doRun(); + +}; + +void +ControllerThread::getStatus() +{ + if (!_session->status(_begin, _end, _count)) { + throw std::runtime_error(vespalib::make_string("ControllerThread: Could not get status from %s", _tlsSpec.c_str())); + } +} + +void +ControllerThread::makeRandomVisitorVector() +{ + std::vector > tmp(_visitors); + _rndVisitors.clear(); + while (tmp.size() > 0) { + size_t i = _generator.getRnd().lrand48() % tmp.size(); + _rndVisitors.push_back(tmp[i]); + tmp.erase(tmp.begin() + i); + } +} + +void +ControllerThread::startSubscribers() +{ + for (size_t i = 0; i < _subscribers.size(); ++i) { + _subscribers[i]->start(); + } +} + +void +ControllerThread::doRun() +{ + _session = _client.open(_domain); + if (_session.get() == NULL) { + throw std::runtime_error(vespalib::make_string("ControllerThread: Could not open session to %s", _tlsSpec.c_str())); + } + + _pruneTimer.SetNow(); + while (!_done) { + // set finished visitors as idle + for (size_t i = 0; i < _visitors.size(); ++i) { + if (_visitors[i]->finished()) { + _visitors[i]->setIdle(); + } + } + // find idle visitor + makeRandomVisitorVector(); + for (size_t i = 0; i < _rndVisitors.size(); ++i) { + if (_rndVisitors[i]->idle()) { + getStatus(); + SerialNum from = _generator.getRandomSerialNum(_begin, _end) - 1; + SerialNum to = _generator.getRandomSerialNum(from + 1, _end); + _rndVisitors[i]->start(from, to); + break; + } + } + // prune transaction log server + if (_pruneTimer.MilliSecsToNow() > _pruneInterval) { + getStatus(); + SerialNum safePrune = _end; + for (size_t i = 0; i < _visitors.size(); ++i) { + if (_visitors[i]->running() && _visitors[i]->getFrom() < safePrune) { + safePrune = _visitors[i]->getFrom(); + } + } + for (size_t i = 0; i < _subscribers.size(); ++i) { + SerialNum next = _subscribers[i]->getExpectedNext(); + if (next < safePrune) { + safePrune = next; + } + } + LOG(info, "ControllerThread: status: begin(%" PRIu64 "), end(%" PRIu64 "), count(%zu)", _begin, _end, _count); + LOG(info, "ControllerThread: prune [%" PRIu64 ", %" PRIu64 ">", _begin, safePrune); + if (!_session->erase(safePrune)) { + throw std::runtime_error(vespalib::make_string("ControllerThread: Could not erase up to %" PRIu64, safePrune)); + } + _pruneTimer.SetNow(); + } + FastOS_Thread::Sleep(_visitorInterval); + } +} + + +//----------------------------------------------------------------------------- +// TransLogStress +//----------------------------------------------------------------------------- +class TransLogStress : public FastOS_Application +{ +private: + class Config { + public: + uint64_t domainPartSize; + size_t packetSize; + + uint64_t stressTime; + uint32_t feedRate; + uint32_t numSubscribers; + uint32_t numVisitors; + uint64_t visitorInterval; + uint64_t pruneInterval; + + uint32_t numPreGeneratedBuffers; + uint32_t minStrLen; + uint32_t maxStrLen; + long baseSeed; + + Config() : + domainPartSize(0), packetSize(0), stressTime(0), feedRate(0), numSubscribers(0), + numVisitors(0), visitorInterval(0), pruneInterval(0), minStrLen(0), maxStrLen(0), baseSeed(0) {} + }; + + Config _cfg; + + void printConfig(); + void usage(); + +public: + int Main(); +}; + +void +TransLogStress::printConfig() +{ + std::cout << "######## Config ########" << std::endl; + std::cout << "stressTime: " << _cfg.stressTime / 1000 << " s" << std::endl; + std::cout << "feedRate: " << _cfg.feedRate << " per/sec" << std::endl; + std::cout << "numSubscribers: " << _cfg.numSubscribers << std::endl; + std::cout << "numVisitors: " << _cfg.numVisitors << std::endl; + std::cout << "visitorInterval: " << _cfg.visitorInterval << " ms" << std::endl; + std::cout << "pruneInterval: " << _cfg.pruneInterval / 1000 << " s" << std::endl; + std::cout << "numPreGeneratedBuffers: " << _cfg.numPreGeneratedBuffers << std::endl; + std::cout << "minStrLen: " << _cfg.minStrLen << std::endl; + std::cout << "maxStrLen: " << _cfg.maxStrLen << std::endl; + std::cout << "baseSeed: " << _cfg.baseSeed << std::endl; + std::cout << "domainPartSize: " << _cfg.domainPartSize << " bytes" << std::endl; + std::cout << "packetSize: " << _cfg.packetSize << " bytes" << std::endl; +} + +void +TransLogStress::usage() +{ + std::cout << "usage: translogstress [-t stressTime(s)] [-f feedRate] [-s numSubscribers]" << std::endl; + std::cout << " [-v numVisitors] [-c visitorInterval(ms)] [-e pruneInterval(s)]" << std::endl; + std::cout << " [-g numPreGeneratedBuffers] [-i minStrLen] [-a maxStrLen] [-b baseSeed]" << std::endl; + std::cout << " [-d domainPartSize] [-p packetSize]" << std::endl; +} + +int +TransLogStress::Main() +{ + std::string tlsSpec("tcp/localhost:17897"); + std::string domain("translogstress"); + _cfg.domainPartSize = 8000000; // ~8MB + _cfg.packetSize = 0x10000; + + _cfg.stressTime = 1000 * 60; + _cfg.feedRate = 10000; + _cfg.numSubscribers = 1; + _cfg.numVisitors = 1; + _cfg.visitorInterval = 1000 * 1; + _cfg.pruneInterval = 1000 * 12; + + _cfg.numPreGeneratedBuffers = 0; + _cfg.minStrLen = 40; + _cfg.maxStrLen = 80; + _cfg.baseSeed = 100; + + uint64_t sleepTime = 4000; + + int idx = 1; + char opt; + const char * arg; + bool optError = false; + while ((opt = GetOpt("d:p:t:f:s:v:c:e:g:i:a:b:h", arg, idx)) != -1) { + switch (opt) { + case 'd': + _cfg.domainPartSize = atol(arg); + break; + case 'p': + _cfg.packetSize = atol(arg); + break; + case 't': + _cfg.stressTime = 1000 * atol(arg); + break; + case 'f': + _cfg.feedRate = atoi(arg); + break; + case 's': + _cfg.numSubscribers = atoi(arg); + break; + case 'v': + _cfg.numVisitors = atoi(arg); + break; + case 'c': + _cfg.visitorInterval = atol(arg); + break; + case 'e': + _cfg.pruneInterval = 1000 * atol(arg); + break; + case 'g': + _cfg.numPreGeneratedBuffers = atoi(arg); + break; + case 'i': + _cfg.minStrLen = atoi(arg); + break; + case 'a': + _cfg.maxStrLen = atoi(arg); + break; + case 'b': + _cfg.baseSeed = atol(arg); + break; + case 'h': + usage(); + return -1; + default: + optError = true; + break; + } + } + + printConfig(); + FastOS_Thread::Sleep(sleepTime); + + if (_argc != idx || optError) { + usage(); + return -1; + } + + // start transaction log server + DummyFileHeaderContext fileHeaderContext; + TransLogServer tls("server", 17897, ".", fileHeaderContext, _cfg.domainPartSize); + TransLogClient client(tlsSpec); + client.create(domain); + + FastOS_ThreadPool threadPool(256000); + + BufferGenerator bufferGenerator(_cfg.minStrLen, _cfg.maxStrLen); + bufferGenerator.setSeed(_cfg.baseSeed); + std::vector buffers; + for (uint32_t i = 0; i < _cfg.numPreGeneratedBuffers; ++i) { + buffers.push_back(bufferGenerator.getRandomBuffer()); + } + EntryGenerator generator(_cfg.baseSeed, bufferGenerator); + if (buffers.size() > 0) { + generator.setBuffers(buffers); + } + + + // start feeder and controller + FeederThread feeder(tlsSpec, domain, generator, _cfg.feedRate, _cfg.packetSize); + threadPool.NewThread(&feeder); + + FastOS_Thread::Sleep(sleepTime); + + ControllerThread controller(tlsSpec, domain, generator, _cfg.numSubscribers, _cfg.numVisitors, + _cfg.visitorInterval, _cfg.pruneInterval); + threadPool.NewThread(&controller); + + // start subscribers + controller.startSubscribers(); + + // stop feeder and controller + FastOS_Thread::Sleep(_cfg.stressTime); + printConfig(); + LOG(info, "Stop feeder..."); + feeder.stop(); + feeder.join(); + std::cout << "" << std::endl; + std::cout << " " << feeder.getRange().from() << "" << std::endl; + std::cout << " " << feeder.getRange().to() << "" << std::endl; + std::cout << " " << 1000 * (feeder.getRange().to() - feeder.getRange().from()) / (sleepTime + _cfg.stressTime) + << "" << std::endl; + std::cout << "" << std::endl; + + LOG(info, "Stop controller..."); + controller.stop(); + controller.join(); + + FastOS_Thread::Sleep(sleepTime); + std::vector > & visitors = controller.getVisitors(); + for (size_t i = 0; i < visitors.size(); ++i) { + std::cout << "" << std::endl; + std::cout << "" << visitors[i]->getState() << "" << std::endl; + std::cout << "" << std::endl; + } + + // stop subscribers + LOG(info, "Stop subscribers..."); + std::vector > & subscribers = controller.getSubscribers(); + for (size_t i = 0; i < subscribers.size(); ++i) { + subscribers[i]->stop(); + std::cout << "" << std::endl; + std::cout << " " << subscribers[i]->getRange().from() << "" << std::endl; + std::cout << " " << subscribers[i]->getRange().to() << "" << std::endl; + std::cout << "" << std::endl; + } + + threadPool.Close(); + + return 0; +} + +} +} + +int main(int argc, char ** argv) +{ + search::transactionlog::TransLogStress myApp; + return myApp.Entry(argc, argv); +} -- cgit v1.2.3