diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-11 15:58:53 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-11 19:38:04 +0200 |
commit | 1aeded94c43e98b1b82c4a71530ea46ab2389bb2 (patch) | |
tree | c8cdfa5a401bbbd75133b44de95782cfb138d741 /searchlib | |
parent | e3dea9daba5b89fab618921a94f9394f8ae9144c (diff) |
Remove subscribe to TLS.
Diffstat (limited to 'searchlib')
10 files changed, 37 insertions, 358 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index efd3b70858a..9f83db9b23a 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -42,7 +42,6 @@ private: uint32_t countFiles(const vespalib::string &dir); void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries); bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name); - bool subscribeDomainTest(TransLogClient & tls, const vespalib::string & name); bool partialUpdateTest(); bool test1(); bool testRemove(); @@ -59,22 +58,20 @@ private: TEST_APPHOOK(Test); -class CallBackTest : public TransLogClient::Subscriber::Callback +class CallBackTest : public TransLogClient::Visitor::Callback { private: virtual RPC::Result receive(const Packet & packet) override; - virtual void inSync() override { _inSync = true; } virtual void eof() override { _eof = true; } typedef std::map<SerialNum, ByteBuffer> PacketMap; PacketMap _packetMap; public: - CallBackTest() : _inSync(false), _eof(false) { } + CallBackTest() : _eof(false) { } size_t size() const { return _packetMap.size(); } bool hasSerial(SerialNum n) const { return (_packetMap.find(n) != _packetMap.end()); } - void clear() { _inSync = false; _eof = false; _packetMap.clear(); } + void clear() { _eof = false; _packetMap.clear(); } const ByteBuffer & packet(SerialNum n) { return (_packetMap.find(n)->second); } - bool _inSync; bool _eof; }; @@ -91,16 +88,14 @@ RPC::Result CallBackTest::receive(const Packet & p) return RPC::OK; } -class CallBackManyTest : public TransLogClient::Subscriber::Callback +class CallBackManyTest : public TransLogClient::Visitor::Callback { private: virtual RPC::Result receive(const Packet & packet) override; - virtual void inSync() override { _inSync = true; } virtual void eof() override { _eof = true; } public: - CallBackManyTest(size_t start) : _inSync(false), _eof(false), _count(start), _value(start) { } - void clear() { _inSync = false; _eof = false; _count = 0; _value = 0; } - bool _inSync; + CallBackManyTest(size_t start) : _eof(false), _count(start), _value(start) { } + void clear() { _eof = false; _count = 0; _value = 0; } bool _eof; size_t _count; size_t _value; @@ -121,21 +116,19 @@ RPC::Result CallBackManyTest::receive(const Packet & p) return RPC::OK; } -class CallBackUpdate : public TransLogClient::Subscriber::Callback +class CallBackUpdate : public TransLogClient::Visitor::Callback { public: typedef std::map<SerialNum, Identifiable *> PacketMap; private: virtual RPC::Result receive(const Packet & packet) override; - virtual void inSync() override { _inSync = true; } virtual void eof() override { _eof = true; } PacketMap _packetMap; public: - CallBackUpdate() : _inSync(false), _eof(false) { } + CallBackUpdate() : _eof(false) { } virtual ~CallBackUpdate() { while (_packetMap.begin() != _packetMap.end()) { delete _packetMap.begin()->second; _packetMap.erase(_packetMap.begin()); } } bool hasSerial(SerialNum n) const { return (_packetMap.find(n) != _packetMap.end()); } const PacketMap & map() const { return _packetMap; } - bool _inSync; bool _eof; }; @@ -176,16 +169,14 @@ class CallBackStatsTest : public TransLogClient::Session::Callback { private: virtual RPC::Result receive(const Packet & packet) override; - virtual void inSync() override { _inSync = true; } virtual void eof() override { _eof = true; } public: - CallBackStatsTest() : _inSync(false), _eof(false), + CallBackStatsTest() : _eof(false), _count(0), _inOrder(0), _firstSerial(0), _lastSerial(0), _prevSerial(0) { } - void clear() { _inSync = false; _eof = false; _count = 0; _inOrder = 0; + void clear() { _eof = false; _count = 0; _inOrder = 0; _firstSerial = 0; _lastSerial = 0; _inOrder = 0; } - bool _inSync; bool _eof; uint64_t _count; uint64_t _inOrder; // increase when next entry is one above previous @@ -258,7 +249,6 @@ bool Test::partialUpdateTest() ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(5, 7) ); for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); } - ASSERT_TRUE( ! ca._inSync ); ASSERT_TRUE( ca._eof ); ASSERT_TRUE( ca.map().size() == 1); ASSERT_TRUE( ca.hasSerial(7) ); @@ -268,7 +258,6 @@ bool Test::partialUpdateTest() ASSERT_TRUE(visitor1.get()); ASSERT_TRUE( visitor1->visit(4, 5) ); for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); } - ASSERT_TRUE( ! ca1._inSync ); ASSERT_TRUE( ca1._eof ); ASSERT_TRUE( ca1.map().size() == 0); @@ -277,7 +266,6 @@ bool Test::partialUpdateTest() ASSERT_TRUE(visitor2.get()); ASSERT_TRUE( visitor2->visit(5, 6) ); for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); } - ASSERT_TRUE( ! ca2._inSync ); ASSERT_TRUE( ca2._eof ); ASSERT_TRUE( ca2.map().size() == 0); @@ -286,7 +274,6 @@ bool Test::partialUpdateTest() ASSERT_TRUE(visitor3.get()); ASSERT_TRUE( visitor3->visit(5, 1000) ); for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { FastOS_Thread::Sleep(10); } - ASSERT_TRUE( ! ca3._inSync ); ASSERT_TRUE( ca3._eof ); ASSERT_TRUE( ca3.map().size() == 1); ASSERT_TRUE( ca3.hasSerial(7) ); @@ -451,7 +438,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c ASSERT_TRUE(visitor.get()); EXPECT_TRUE( visitor->visit(0, 1) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } - EXPECT_TRUE( ! ca._inSync ); EXPECT_TRUE( ca._eof ); EXPECT_TRUE( ! ca.hasSerial(0) ); EXPECT_TRUE( ca.hasSerial(1) ); @@ -462,7 +448,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c ASSERT_TRUE(visitor.get()); EXPECT_TRUE( visitor->visit(1, 2) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } - EXPECT_TRUE( ! ca._inSync ); EXPECT_TRUE( ca._eof ); EXPECT_TRUE( ! ca.hasSerial(0) ); EXPECT_TRUE( ! ca.hasSerial(1) ); @@ -474,7 +459,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c EXPECT_TRUE(visitor.get()); EXPECT_TRUE( visitor->visit(0, 3) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } - EXPECT_TRUE( ! ca._inSync ); EXPECT_TRUE( ca._eof ); EXPECT_TRUE( ! ca.hasSerial(0) ); EXPECT_TRUE( ca.hasSerial(1) ); @@ -486,7 +470,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c ASSERT_TRUE(visitor.get()); EXPECT_TRUE( visitor->visit(2, 3) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } - EXPECT_TRUE( ! ca._inSync ); EXPECT_TRUE( ca._eof ); EXPECT_TRUE( ! ca.hasSerial(0) ); EXPECT_TRUE( !ca.hasSerial(1) ); @@ -497,23 +480,6 @@ bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, c return retval; } -bool Test::subscribeDomainTest(TransLogClient & tls, const vespalib::string & name) -{ - bool retval(true); - CallBackTest ca; - TransLogClient::Subscriber::UP subscriber = tls.createSubscriber(name, ca); - ASSERT_TRUE(subscriber.get()); - ASSERT_TRUE( subscriber->subscribe(0) ); - for (size_t i(0); ! ca._inSync && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } - ASSERT_TRUE( ca._inSync ); - ASSERT_TRUE( ! ca.hasSerial(0) ); - ASSERT_TRUE( ! ca._eof ); - ASSERT_TRUE( ca.hasSerial(1) ); - ASSERT_TRUE( ca.hasSerial(2) ); - ASSERT_TRUE( ca.hasSerial(3) ); - return retval; -} - bool Test::test1() { DummyFileHeaderContext fileHeaderContext; @@ -525,7 +491,6 @@ bool Test::test1() TransLogClient::Session::UP s1 = openDomainTest(tls, name); fillDomainTest(s1.get(), name); visitDomainTest(tls, s1.get(), name); - subscribeDomainTest(tls, name); return true; } @@ -569,7 +534,6 @@ bool Test::testRemove() TransLogClient::Session::UP s1 = openDomainTest(tls, name); fillDomainTest(s1.get(), name); visitDomainTest(tls, s1.get(), name); - subscribeDomainTest(tls, name); ASSERT_TRUE(tls.remove(name)); return true; @@ -584,7 +548,6 @@ bool Test::test2() vespalib::string name("test1"); TransLogClient::Session::UP s1 = openDomainTest(tls, name); visitDomainTest(tls, s1.get(), name); - subscribeDomainTest(tls, name); return true; } @@ -603,7 +566,6 @@ assertVisitStats(TransLogClient &tls, const vespalib::string &domain, for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } - ASSERT_TRUE(!ca._inSync); ASSERT_TRUE(ca._eof); EXPECT_EQUAL(expFirstSerial, ca._firstSerial); EXPECT_EQUAL(expLastSerial, ca._lastSerial); @@ -651,7 +613,6 @@ void Test::testMany() ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } - ASSERT_TRUE( ! ca._inSync ); ASSERT_TRUE( ca._eof ); EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES); @@ -673,7 +634,6 @@ void Test::testMany() ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { FastOS_Thread::Sleep(10); } - ASSERT_TRUE( ! ca._inSync ); ASSERT_TRUE( ca._eof ); EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES); diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp index aabfdb62a96..abba84b75b6 100644 --- a/searchlib/src/tests/transactionlogstress/translogstress.cpp +++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp @@ -23,11 +23,9 @@ using vespalib::make_string; using vespalib::ConstBufferRef; using search::index::DummyFileHeaderContext; -namespace search { -namespace transactionlog { +namespace search::transactionlog { using ClientSession = TransLogClient::Session; -using Subscriber = TransLogClient::Subscriber; using Visitor = TransLogClient::Visitor; //----------------------------------------------------------------------------- @@ -308,89 +306,12 @@ public: _generator(generator), _name(name), _id(id), _validate(validate) {} virtual ~Agent() {} virtual RPC::Result receive(const Packet & packet) override = 0; - virtual void inSync() override {} virtual void eof() override {} virtual void failed() {} }; //----------------------------------------------------------------------------- -// SubscriberAgent -//----------------------------------------------------------------------------- -class SubscriberAgent : public Agent -{ -private: - std::unique_ptr<Subscriber> _subscriber; - SerialNum _from; - SerialNum _next; - Monitor _monitor; - - SerialNum getNext() { - MonitorGuard guard(_monitor); - return _next++; - } - -public: - SubscriberAgent(const std::string & tlsSpec, const std::string & domain, - const EntryGenerator & generator, SerialNum from, uint32_t id, bool validate) : - Agent(tlsSpec, domain, generator, "SubscriberAgent", id, validate), - _subscriber(), _from(from), _next(from + 1) {} - virtual ~SubscriberAgent() {} - void start(); - void stop(); - SerialNum getExpectedNext() const { - MonitorGuard guard(_monitor); - return _next; - } - SerialNumRange getRange() const { return SerialNumRange(_from, _next - 1); } - virtual RPC::Result receive(const Packet & packet) override; -}; - -void -SubscriberAgent::start() -{ - _subscriber = _client.createSubscriber(_domain, *this); - if (_subscriber.get() == NULL) { - throw std::runtime_error(vespalib::make_string - ("SubscriberAgent[%u]: Could not open subscriber to %s", _id, _tlsSpec.c_str())); - } - if (!_subscriber->subscribe(_from)) { - throw std::runtime_error(vespalib::make_string - ("SubscriberAgent[%u]: Could not subscribe to %s from serialnumber %" PRIu64, - _id, _tlsSpec.c_str(), _from)); - } -} - -void -SubscriberAgent::stop() -{ - _subscriber.reset(); -} - -RPC::Result -SubscriberAgent::receive(const Packet & packet) -{ - auto handle = packet.getHandle(); - while (handle.size() > 0) { - Packet::Entry entry; - entry.deserialize(handle); - Packet::Entry expected = _generator.getRandomEntry(getNext()); - if (_validate) { - if (!EntryComparator::cmp(entry, expected)) { - throw std::runtime_error(vespalib::make_string - ("SubscriberAgent[%u]: Got %s, expected %s", _id, - EntryPrinter::toStr(entry).c_str(), - EntryPrinter::toStr(expected).c_str())); - } - } - } - LOG(info, "SubscriberAgent[%u]: received %s", _id, PacketPrinter::toStr(packet).c_str()); - - return RPC::OK; -} - - -//----------------------------------------------------------------------------- // VisitorAgent //----------------------------------------------------------------------------- class VisitorAgent : public Agent @@ -534,7 +455,6 @@ private: TransLogClient _client; std::unique_ptr<ClientSession> _session; EntryGenerator _generator; - std::vector<std::shared_ptr<SubscriberAgent> > _subscribers; std::vector<std::shared_ptr<VisitorAgent> > _visitors; std::vector<std::shared_ptr<VisitorAgent> > _rndVisitors; uint64_t _visitorInterval; // in milliseconds @@ -548,29 +468,22 @@ private: void makeRandomVisitorVector(); public: - ControllerThread(const std::string & tlsSpec, const std::string & domain, - const EntryGenerator & generator, uint32_t numSubscribers, uint32_t numVisitors, - uint64_t visitorInterval, uint64_t pruneInterval); + ControllerThread(const std::string & tlsSpec, const std::string & domain, const EntryGenerator & generator, + uint32_t numVisitors, uint64_t visitorInterval, uint64_t pruneInterval); ~ControllerThread(); - void startSubscribers(); uint32_t runningVisitors(); - std::vector<std::shared_ptr<SubscriberAgent> > & getSubscribers() { return _subscribers; } std::vector<std::shared_ptr<VisitorAgent> > & getVisitors() { return _visitors; } virtual void doRun() override; }; ControllerThread::ControllerThread(const std::string & tlsSpec, const std::string & domain, - const EntryGenerator & generator, uint32_t numSubscribers, uint32_t numVisitors, + const EntryGenerator & generator, uint32_t numVisitors, uint64_t visitorInterval, uint64_t pruneInterval) : _tlsSpec(tlsSpec), _domain(domain), _client(tlsSpec.c_str()), _session(), - _generator(generator), _subscribers(), _visitors(), _rndVisitors(), _visitorInterval(visitorInterval), + _generator(generator), _visitors(), _rndVisitors(), _visitorInterval(visitorInterval), _pruneInterval(pruneInterval), _pruneTimer(), _begin(0), _end(0), _count(0) { - for (uint32_t i = 0; i < numSubscribers; ++i) { - _subscribers.push_back(std::make_shared<SubscriberAgent>(tlsSpec, domain, generator, 0, i, true)); - } - for (uint32_t i = 0; i < numVisitors; ++i) { _visitors.push_back(std::make_shared<VisitorAgent>(tlsSpec, domain, generator, i, true)); } @@ -598,14 +511,6 @@ ControllerThread::makeRandomVisitorVector() } void -ControllerThread::startSubscribers() -{ - for (size_t i = 0; i < _subscribers.size(); ++i) { - _subscribers[i]->start(); - } -} - -void ControllerThread::doRun() { _session = _client.open(_domain); @@ -641,12 +546,6 @@ ControllerThread::doRun() safePrune = _visitors[i]->getFrom(); } } - for (size_t i = 0; i < _subscribers.size(); ++i) { - SerialNum next = _subscribers[i]->getExpectedNext(); - if (next < safePrune) { - safePrune = next; - } - } LOG(info, "ControllerThread: status: begin(%" PRIu64 "), end(%" PRIu64 "), count(%zu)", _begin, _end, _count); LOG(info, "ControllerThread: prune [%" PRIu64 ", %" PRIu64 ">", _begin, safePrune); if (!_session->erase(safePrune)) { @@ -672,7 +571,6 @@ private: uint64_t stressTime; uint32_t feedRate; - uint32_t numSubscribers; uint32_t numVisitors; uint64_t visitorInterval; uint64_t pruneInterval; @@ -683,7 +581,7 @@ private: long baseSeed; Config() : - domainPartSize(0), packetSize(0), stressTime(0), feedRate(0), numSubscribers(0), + domainPartSize(0), packetSize(0), stressTime(0), feedRate(0), numVisitors(0), visitorInterval(0), pruneInterval(0), minStrLen(0), maxStrLen(0), baseSeed(0) {} }; @@ -702,7 +600,6 @@ TransLogStress::printConfig() std::cout << "######## Config ########" << std::endl; std::cout << "stressTime: " << _cfg.stressTime / 1000 << " s" << std::endl; std::cout << "feedRate: " << _cfg.feedRate << " per/sec" << std::endl; - std::cout << "numSubscribers: " << _cfg.numSubscribers << std::endl; std::cout << "numVisitors: " << _cfg.numVisitors << std::endl; std::cout << "visitorInterval: " << _cfg.visitorInterval << " ms" << std::endl; std::cout << "pruneInterval: " << _cfg.pruneInterval / 1000 << " s" << std::endl; @@ -733,7 +630,6 @@ TransLogStress::Main() _cfg.stressTime = 1000 * 60; _cfg.feedRate = 10000; - _cfg.numSubscribers = 1; _cfg.numVisitors = 1; _cfg.visitorInterval = 1000 * 1; _cfg.pruneInterval = 1000 * 12; @@ -763,9 +659,6 @@ TransLogStress::Main() case 'f': _cfg.feedRate = atoi(arg); break; - case 's': - _cfg.numSubscribers = atoi(arg); - break; case 'v': _cfg.numVisitors = atoi(arg); break; @@ -830,13 +723,9 @@ TransLogStress::Main() FastOS_Thread::Sleep(sleepTime); - ControllerThread controller(tlsSpec, domain, generator, _cfg.numSubscribers, _cfg.numVisitors, - _cfg.visitorInterval, _cfg.pruneInterval); + ControllerThread controller(tlsSpec, domain, generator, _cfg.numVisitors, _cfg.visitorInterval, _cfg.pruneInterval); threadPool.NewThread(&controller); - // start subscribers - controller.startSubscribers(); - // stop feeder and controller FastOS_Thread::Sleep(_cfg.stressTime); printConfig(); @@ -862,24 +751,12 @@ TransLogStress::Main() std::cout << "</visitor>" << std::endl; } - // stop subscribers - LOG(info, "Stop subscribers..."); - std::vector<std::shared_ptr<SubscriberAgent> > & subscribers = controller.getSubscribers(); - for (size_t i = 0; i < subscribers.size(); ++i) { - subscribers[i]->stop(); - std::cout << "<subscriber id='" << i << "'>" << std::endl; - std::cout << " <from>" << subscribers[i]->getRange().from() << "</from>" << std::endl; - std::cout << " <to>" << subscribers[i]->getRange().to() << "</to>" << std::endl; - std::cout << "</subscriber>" << std::endl; - } - threadPool.Close(); return 0; } } -} int main(int argc, char ** argv) { diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 5d66dded0dd..e46b8744ae8 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -253,7 +253,7 @@ void Domain::cleanSessions() LockGuard guard(_sessionLock); for (SessionList::iterator it(_sessions.begin()), mt(_sessions.end()); it != mt; ) { Session * session(it->second.get()); - if ((!session->continous() && session->inSync())) { + if (session->inSync()) { _sessions.erase(it++); } else if (session->finished()) { _sessions.erase(it++); @@ -295,16 +295,6 @@ void Domain::commit(const Packet & packet) } dp->commit(entry.serial(), packet); cleanSessions(); - - LockGuard guard(_sessionLock); - for (auto & it : _sessions) { - const Session::SP & session(it.second); - if (session->continous()) { - if (session->ok()) { - Session::enQ(session, entry.serial(), packet); - } - } - } } bool Domain::erase(SerialNum to) @@ -378,18 +368,6 @@ int Domain::closeSession(int sessionId) return retval; } -int Domain::subscribe(const Domain::SP & domain, SerialNum from, FRT_Supervisor & supervisor, FNET_Connection *conn) -{ - assert(this == domain.get()); - cleanSessions(); - SerialNumRange range(from, end()); - Session * session = new Session(_sessionId++, range, domain, supervisor, conn, true); - LockGuard guard(_sessionLock); - _sessions[session->id()] = Session::SP(session); - return session->id(); -} - - Domain::SerialNumList Domain::scanDir() { diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index dfd201d182f..ab7ded91e5b 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -49,7 +49,6 @@ public: void commit(const Packet & packet); int visit(const Domain::SP & self, SerialNum from, SerialNum to, FRT_Supervisor & supervisor, FNET_Connection *conn); - int subscribe(const Domain::SP & self, SerialNum from, FRT_Supervisor & supervisor, FNET_Connection *conn); SerialNum begin() const; SerialNum end() const; diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp index 302e0c12dda..3ea656be9a2 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp @@ -19,17 +19,7 @@ namespace { vespalib::Executor::Task::UP Session::createTask(const Session::SP & session) { - if (session->continous()) { - return Task::UP(new SubscribeTask(session)); - } else { - return Task::UP(new VisitTask(session)); - } -} - -void -Session::SubscribeTask::run() -{ - _session->subscribe(); + return Task::UP(new VisitTask(session)); } void @@ -110,14 +100,6 @@ Session::enQ(const SP & session, SerialNum serial, const Packet & packet) } void -Session::subscribe() -{ - visit(); - sendPending(); - sendSync(); -} - -void Session::sendPending() { for (;;) { @@ -147,7 +129,7 @@ void Session::finalize() { if (!ok()) { - LOG(error, "[%d] : Error in %s(%" PRIu64 " - %" PRIu64 "), stopping since I have no idea on what to do.", _id, (continous() ? "subscriber" : "visitor"), _range.from(), _range.to()); + LOG(error, "[%d] : Error in %s(%" PRIu64 " - %" PRIu64 "), stopping since I have no idea on what to do.", _id, "visitor", _range.from(), _range.to()); } LOG(debug, "[%d] : Stopped %" PRIu64 " - %" PRIu64, _id, _range.from(), _range.to()); _finished = true; @@ -206,13 +188,12 @@ Session::rpcAsync(FRT_RPCRequest * req) } Session::Session(int sId, const SerialNumRange & r, const Domain::SP & d, - FRT_Supervisor & supervisor, FNET_Connection *conn, bool subscriber) : + FRT_Supervisor & supervisor, FNET_Connection *conn) : _supervisor(supervisor), _connection(conn), _domain(d), _range(r), _id(sId), - _subscriber(subscriber), _inSync(false), _ok(true), _finished(false), @@ -254,19 +235,6 @@ Session::send(FRT_RPCRequest * req, bool wait) } bool -Session::sendSync() -{ - FRT_RPCRequest *req = _supervisor.AllocRPCRequest(); - req->SetMethodName("syncCallback"); - req->GetParams()->AddString(_domain->name().c_str()); - req->GetParams()->AddInt32(id()); - bool retval(send(req, true)); - LockGuard guard(_lock); - _inSync = true; - return retval; -} - -bool Session::sendDone() { FRT_RPCRequest *req = _supervisor.AllocRPCRequest(); diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.h b/searchlib/src/vespa/searchlib/transactionlog/session.h index 26e448540c3..ac6f496e151 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.h +++ b/searchlib/src/vespa/searchlib/transactionlog/session.h @@ -13,7 +13,7 @@ namespace search::transactionlog { class Domain; class DomainPart; -typedef std::shared_ptr<Domain> DomainSP; +using DomainSP = std::shared_ptr<Domain>; class Session : public FRT_IRequestWait { @@ -24,12 +24,11 @@ public: typedef std::shared_ptr<Session> SP; Session(const Session &) = delete; Session & operator = (const Session &) = delete; - Session(int sId, const SerialNumRange & r, const DomainSP & d, FRT_Supervisor & supervisor, FNET_Connection *conn, bool subscriber=false); - virtual ~Session(); + Session(int sId, const SerialNumRange & r, const DomainSP & d, FRT_Supervisor & supervisor, FNET_Connection *conn); + ~Session(); const SerialNumRange & range() const { return _range; } int id() const { return _id; } bool inSync() const; - bool continous() const { return _subscriber; } bool ok() const { return _ok; } bool finished() const; static void enQ(const SP & session, SerialNum serial, const Packet & packet); @@ -51,13 +50,7 @@ private: void run() override; Session::SP _session; }; - class SubscribeTask : public Task { - public: - SubscribeTask(const Session::SP & session) : _session(session) { } - private: - void run() override; - Session::SP _session; - }; + class SendTask : public Task { public: SendTask(const Session::SP & session) : _session(session) { } @@ -70,11 +63,9 @@ private: bool send(const Packet & packet); void sendPacket(SerialNum serial, const Packet & packet); bool sendDone(); - bool sendSync(); void sendPending(); void visit(); void visitOnly(); - void subscribe(); void finalize(); bool visit(FastOS_FileInterface & file, DomainPart & dp) __attribute__((noinline)); int32_t rpc(FRT_RPCRequest * req); @@ -84,7 +75,6 @@ private: DomainSP _domain; SerialNumRange _range; int _id; - bool _subscriber; bool _inSync; bool _ok; bool _finished; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp index af2e8ad47a1..aa2b558ea0c 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp @@ -88,12 +88,8 @@ TransLogClient::Session::UP TransLogClient::open(const vespalib::string & domain return session; } -TransLogClient::Subscriber::UP TransLogClient::createSubscriber(const vespalib::string & domain, TransLogClient::Session::Callback & callBack) -{ - return TransLogClient::Subscriber::UP(new Subscriber(domain, *this, callBack)); -} - -TransLogClient::Visitor::UP TransLogClient::createVisitor(const vespalib::string & domain, TransLogClient::Session::Callback & callBack) +TransLogClient::Visitor::UP +TransLogClient::createVisitor(const vespalib::string & domain, TransLogClient::Session::Callback & callBack) { return TransLogClient::Visitor::UP(new Visitor(domain, *this, callBack)); } @@ -151,13 +147,6 @@ void TransLogClient::exportRPC(FRT_Supervisor & supervisor) rb.ReturnDesc("result", "A resultcode(int) of the operation. Non zero number indicates error."); //-- Visit Callbacks ----------------------------------------------------------- - rb.DefineMethod("syncCallback", "si", "i", false, FRT_METHOD(TransLogClient::syncCallbackRPC), this); - rb.MethodDesc("Will tell you that now you are uptodate on the subscribtion."); - rb.ParamDesc("name", "The name of the domain."); - rb.ParamDesc("session", "Session handle."); - rb.ReturnDesc("result", "A resultcode(int) of the operation. Non zero number indicates error."); - - //-- Visit Callbacks ----------------------------------------------------------- rb.DefineMethod("eofCallback", "si", "i", false, FRT_METHOD(TransLogClient::eofCallbackRPC), this); rb.MethodDesc("Will tell you that you are done with the visitor."); rb.ParamDesc("name", "The name of the domain."); @@ -182,24 +171,6 @@ void TransLogClient::visitCallbackRPC(FRT_RPCRequest *req) LOG(debug, "visitCallback(%s, %d)=%d done", domainName, sessionId, retval); } -void TransLogClient::syncCallbackRPC(FRT_RPCRequest *req) -{ - uint32_t retval(uint32_t(-1)); - FRT_Values & params = *req->GetParams(); - FRT_Values & ret = *req->GetReturn(); - const char * domainName = params[0]._string._str; - int32_t sessionId(params[1]._intval32); - LOG(debug, "syncCallback(%s, %d)", domainName, sessionId); - LockGuard guard(_lock); - Session * session(findSession(domainName, sessionId)); - if (session != NULL) { - session->inSync(); - retval = 0; - } - ret.AddInt32(retval); - LOG(debug, "syncCallback(%s, %d)=%d done", domainName, sessionId, retval); -} - void TransLogClient::eofCallbackRPC(FRT_RPCRequest *req) { uint32_t retval(uint32_t(-1)); @@ -322,21 +293,12 @@ int TransLogClient::SessionKey::cmp(const TransLogClient::SessionKey & b) const return diff; } -TransLogClient::Subscriber::Subscriber(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack) : +TransLogClient::Visitor::Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack) : Session(domain, tlc), _callback(callBack) { } -TransLogClient::Subscriber::~Subscriber() -{ -} - -TransLogClient::Visitor::Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack) : - Subscriber(domain, tlc, callBack) -{ -} - bool TransLogClient::Session::init(FRT_RPCRequest *req) { int32_t retval(_tlc.rpc(req)); @@ -364,15 +326,6 @@ bool TransLogClient::Visitor::visit(const SerialNum & from, const SerialNum & to return init(req); } -bool TransLogClient::Subscriber::subscribe(const SerialNum & from) -{ - FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); - req->SetMethodName("domainSubscribe"); - req->GetParams()->AddString(_domain.c_str()); - req->GetParams()->AddInt64(from); - return init(req); -} - bool TransLogClient::Session::run() { FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); @@ -402,8 +355,6 @@ bool TransLogClient::Session::close() return (retval == 0); } -TransLogClient::Visitor::~Visitor() -{ -} +TransLogClient::Visitor::~Visitor() = default; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h index a3351549df8..87901890673 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h @@ -28,7 +28,6 @@ public: public: virtual ~Callback() { } virtual RPC::Result receive(const Packet & packet) = 0; - virtual void inSync() { } virtual void eof() { } }; public: @@ -46,7 +45,6 @@ public: bool sync(const SerialNum &syncTo, SerialNum &syncedTo); virtual RPC::Result visit(const Packet & ) { return RPC::OK; } - virtual void inSync() { } virtual void eof() { } bool close(); void clear(); @@ -60,32 +58,22 @@ public: int _sessionId; }; /// Here you connect to the incomming data getting everything from <from> - class Subscriber : public Session + class Visitor : public Session { public: - typedef std::unique_ptr<Subscriber> UP; - typedef std::shared_ptr<Subscriber> SP; + typedef std::unique_ptr<Visitor> UP; + typedef std::shared_ptr<Visitor> SP; - Subscriber(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack); - bool subscribe(const SerialNum & from); - ~Subscriber(); + Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack); + bool visit(const SerialNum & from, const SerialNum & to); + virtual ~Visitor(); RPC::Result visit(const Packet & packet) override { return _callback.receive(packet); } - void inSync() override { _callback.inSync(); } void eof() override { _callback.eof(); } private: Callback & _callback; }; /// Here you read the incomming data getting everything from <from> - class Visitor : public Subscriber - { - public: - typedef std::unique_ptr<Visitor> UP; - typedef std::shared_ptr<Visitor> SP; - Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack); - bool visit(const SerialNum & from, const SerialNum & to); - virtual ~Visitor(); - }; public: typedef std::unique_ptr<TransLogClient> UP; @@ -100,8 +88,6 @@ public: Session::UP open(const vespalib::string & domain); /// Here you can get a list of available domains. bool listDomains(std::vector<vespalib::string> & dir); - /// Here you get a subscriber - Subscriber::UP createSubscriber(const vespalib::string & domain, Session::Callback & callBack); Visitor::UP createVisitor(const vespalib::string & domain, Session::Callback & callBack); bool isConnected() const; @@ -111,7 +97,6 @@ public: private: void exportRPC(FRT_Supervisor & supervisor); void visitCallbackRPC(FRT_RPCRequest *req); - void syncCallbackRPC(FRT_RPCRequest *req); void eofCallbackRPC(FRT_RPCRequest *req); int32_t rpc(FRT_RPCRequest * req); Session * findSession(const vespalib::string & domain, int sessionId); diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index e1fc4399613..1eb6301f3f5 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -166,8 +166,6 @@ void TransLogServer::run() bool immediate = true; if (strcmp(req->GetMethodName(), "domainSessionClose") == 0) { domainSessionClose(req); - } else if (strcmp(req->GetMethodName(), "domainSubscribe") == 0) { - domainSubscribe(req); } else if (strcmp(req->GetMethodName(), "domainVisit") == 0) { domainVisit(req); } else if (strcmp(req->GetMethodName(), "createDomain") == 0) { @@ -302,13 +300,6 @@ void TransLogServer::exportRPC(FRT_Supervisor & supervisor) rb.ParamDesc("to", "Will erase all up and including."); rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error."); - //-- Domain Subscribe ----------------------------------------------------------- - rb.DefineMethod("domainSubscribe", "sl", "i", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this); - rb.MethodDesc("This will create a subscription. It will live till the connection is closed."); - rb.ParamDesc("name", "The name of the domain."); - rb.ParamDesc("from", "Will return all entries following(not including) <from>."); - rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error. Positive number is the sessionid"); - //-- Domain Visit ----------------------------------------------------------- rb.DefineMethod("domainVisit", "sll", "i", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this); rb.MethodDesc("This will create a visitor that return all operations in the range."); @@ -332,14 +323,11 @@ void TransLogServer::exportRPC(FRT_Supervisor & supervisor) rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error. 1 means busy -> retry. 0 is OK."); //-- Domain Sync -- - rb.DefineMethod("domainSync", "sl", "il", true, - FRT_METHOD(TransLogServer::relayToThreadRPC), this); + rb.DefineMethod("domainSync", "sl", "il", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this); rb.MethodDesc("Sync domain to given entry"); rb.ParamDesc("name", "The name of the domain."); rb.ParamDesc("syncto", "Entry to sync to"); - rb.ReturnDesc("result", - "A resultcode(int) of the operation. " - "Negative number indicates error."); + rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error."); rb.ReturnDesc("syncedto", "Entry synced to"); } @@ -497,22 +485,6 @@ void TransLogServer::domainCommit(FRT_RPCRequest *req) } } -void TransLogServer::domainSubscribe(FRT_RPCRequest *req) -{ - uint32_t retval(uint32_t(-1)); - FRT_Values & params = *req->GetParams(); - FRT_Values & ret = *req->GetReturn(); - const char * domainName = params[0]._string._str; - LOG(debug, "domainSubscribe(%s)", domainName); - Domain::SP domain(findDomain(domainName)); - if (domain) { - SerialNum from(params[1]._intval64); - LOG(debug, "domainSubscribe(%s, %" PRIu64 ")", domainName, from); - retval = domain->subscribe(domain, from, *_supervisor, req->GetConnection()); - } - ret.AddInt32(retval); -} - void TransLogServer::domainVisit(FRT_RPCRequest *req) { uint32_t retval(uint32_t(-1)); diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index 225206dc0f5..8b612e4967a 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -61,7 +61,6 @@ private: void domainSessionRun(FRT_RPCRequest *req); void domainPrune(FRT_RPCRequest *req); void domainVisit(FRT_RPCRequest *req); - void domainSubscribe(FRT_RPCRequest *req); void domainSessionClose(FRT_RPCRequest *req); void domainSync(FRT_RPCRequest *req); |