diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-15 11:32:16 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-15 11:33:09 +0000 |
commit | af3f5cd865a24507271c1525c61fc97119a1ce83 (patch) | |
tree | e20169f10c1a2c1ecd6e655e309f8d9db403c58a /searchlib | |
parent | f500f24745747137bad942c0c8c40db56be6c49e (diff) |
Decouple code and reduce code visibility.
Diffstat (limited to 'searchlib')
10 files changed, 432 insertions, 358 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index a20e0cc3aaa..e6cf44cb697 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -4,6 +4,7 @@ #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/objects/identifiable.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> +#include <vespa/document/util/bytebuffer.h> #include <vespa/fastos/file.h> #include <vespa/log/log.h> @@ -15,17 +16,24 @@ using namespace document; using namespace vespalib; using namespace std::chrono_literals; using search::index::DummyFileHeaderContext; +using search::transactionlog::client::TransLogClient; +using search::transactionlog::client::Session; +using search::transactionlog::client::Visitor; +using search::transactionlog::client::RPC; +using search::transactionlog::client::Callback; +using SessionUP = std::unique_ptr<Session>; +using VisitorUP = std::unique_ptr<Visitor>; namespace { bool createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains=0); -TransLogClient::Session::UP openDomainTest(TransLogClient & tls, const vespalib::string & name); -bool fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name); -void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries); -void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize); +SessionUP openDomainTest(TransLogClient & tls, const vespalib::string & name); +bool fillDomainTest(Session * s1, const vespalib::string & name); +void fillDomainTest(Session * s1, size_t numPackets, size_t numEntries); +void fillDomainTest(Session * s1, size_t numPackets, size_t numEntries, size_t entrySize); uint32_t countFiles(const vespalib::string &dir); -void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries); -bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name); +void checkFilledDomainTest(const SessionUP &s1, size_t numEntries); +bool visitDomainTest(TransLogClient & tls, Session * s1, const vespalib::string & name); void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains); void verifyDomain(const vespalib::string & name); @@ -43,7 +51,7 @@ myhex(const void * b, size_t sz) return s; } -class CallBackTest : public TransLogClient::Visitor::Callback +class CallBackTest : public Callback { private: virtual RPC::Result receive(const Packet & packet) override; @@ -74,7 +82,7 @@ CallBackTest::receive(const Packet & p) return RPC::OK; } -class CallBackManyTest : public TransLogClient::Visitor::Callback +class CallBackManyTest : public Callback { private: virtual RPC::Result receive(const Packet & packet) override; @@ -103,7 +111,7 @@ CallBackManyTest::receive(const Packet & p) return RPC::OK; } -class CallBackUpdate : public TransLogClient::Visitor::Callback +class CallBackUpdate : public Callback { public: typedef std::map<SerialNum, Identifiable *> PacketMap; @@ -153,7 +161,7 @@ CallBackUpdate::receive(const Packet & packet) return RPC::OK; } -class CallBackStatsTest : public TransLogClient::Session::Callback +class CallBackStatsTest : public Callback { private: virtual RPC::Result receive(const Packet & packet) override; @@ -219,8 +227,8 @@ createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t pre std::vector<vespalib::string> dir; tls.listDomains(dir); EXPECT_EQUAL (dir.size(), preExistingDomains); - TransLogClient::Session::UP s1 = tls.open(name); - ASSERT_TRUE (s1.get() == NULL); + SessionUP s1 = tls.open(name); + ASSERT_FALSE (s1); retval = tls.create(name); ASSERT_TRUE (retval); dir.clear(); @@ -230,16 +238,16 @@ createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t pre return retval; } -TransLogClient::Session::UP +SessionUP openDomainTest(TransLogClient & tls, const vespalib::string & name) { - TransLogClient::Session::UP s1 = tls.open(name); - ASSERT_TRUE (s1.get() != NULL); + SessionUP s1 = tls.open(name); + ASSERT_TRUE (s1); return s1; } bool -fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name) +fillDomainTest(Session * s1, const vespalib::string & name) { bool retval(true); Packet::Entry e1(1, 1, vespalib::ConstBufferRef("Content in buffer A", 20)); @@ -279,7 +287,7 @@ fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name) } void -fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries) +fillDomainTest(Session * s1, size_t numPackets, size_t numEntries) { size_t value(0); for(size_t i=0; i < numPackets; i++) { @@ -333,7 +341,7 @@ fillDomainTest(TransLogServer & s1, const vespalib::string & domain, size_t numP void -fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize) +fillDomainTest(Session * s1, size_t numPackets, size_t numEntries, size_t entrySize) { size_t value(0); std::vector<char> entryBuffer(entrySize); @@ -368,7 +376,7 @@ countFiles(const vespalib::string &dir) } void -checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries) +checkFilledDomainTest(const SessionUP &s1, size_t numEntries) { SerialNum b(0), e(0); size_t c(0); @@ -379,7 +387,7 @@ checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries) } bool -visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name) +visitDomainTest(TransLogClient & tls, Session * s1, const vespalib::string & name) { bool retval(true); @@ -391,7 +399,7 @@ visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespal EXPECT_EQUAL(c, 3u); CallBackTest ca; - TransLogClient::Visitor::UP visitor = tls.createVisitor(name, ca); + VisitorUP visitor = tls.createVisitor(name, ca); ASSERT_TRUE(visitor.get()); EXPECT_TRUE( visitor->visit(0, 1) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -451,7 +459,7 @@ void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_ TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, name, preExistingDomains); - TransLogClient::Session::UP s1 = openDomainTest(tls, name); + SessionUP s1 = openDomainTest(tls, name); fillDomainTest(s1.get(), name); } @@ -459,7 +467,7 @@ void verifyDomain(const vespalib::string & name) { DummyFileHeaderContext fileHeaderContext; TransLogServer tlss("test13", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); - TransLogClient::Session::UP s1 = openDomainTest(tls, name); + SessionUP s1 = openDomainTest(tls, name); visitDomainTest(tls, s1.get(), name); } @@ -472,7 +480,7 @@ TEST("testVisitOverGeneratedDomain") { vespalib::string name("test1"); createDomainTest(tls, name); - TransLogClient::Session::UP s1 = openDomainTest(tls, name); + SessionUP s1 = openDomainTest(tls, name); fillDomainTest(s1.get(), name); EXPECT_EQUAL(0, getMaxSessionRunTime(tlss, "test1")); visitDomainTest(tls, s1.get(), name); @@ -488,7 +496,7 @@ TEST("testVisitOverPreExistingDomain") { TransLogClient tls("tcp/localhost:18377"); vespalib::string name("test1"); - TransLogClient::Session::UP s1 = openDomainTest(tls, name); + SessionUP s1 = openDomainTest(tls, name); visitDomainTest(tls, s1.get(), name); } @@ -497,8 +505,8 @@ TEST("partialUpdateTest") { TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); - TransLogClient::Session::UP s1 = openDomainTest(tls, "test1"); - TransLogClient::Session & session = *s1; + SessionUP s1 = openDomainTest(tls, "test1"); + Session & session = *s1; TestIdentifiable du; @@ -513,7 +521,7 @@ TEST("partialUpdateTest") { ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().data(), pa.getHandle().size()))); CallBackUpdate ca; - TransLogClient::Visitor::UP visitor = tls.createVisitor("test1", ca); + VisitorUP visitor = tls.createVisitor("test1", ca); ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(5, 7) ); for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -522,7 +530,7 @@ TEST("partialUpdateTest") { ASSERT_TRUE( ca.hasSerial(7) ); CallBackUpdate ca1; - TransLogClient::Visitor::UP visitor1 = tls.createVisitor("test1", ca1); + VisitorUP visitor1 = tls.createVisitor("test1", ca1); ASSERT_TRUE(visitor1.get()); ASSERT_TRUE( visitor1->visit(4, 5) ); for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -530,7 +538,7 @@ TEST("partialUpdateTest") { ASSERT_TRUE( ca1.map().size() == 0); CallBackUpdate ca2; - TransLogClient::Visitor::UP visitor2 = tls.createVisitor("test1", ca2); + VisitorUP visitor2 = tls.createVisitor("test1", ca2); ASSERT_TRUE(visitor2.get()); ASSERT_TRUE( visitor2->visit(5, 6) ); for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -538,7 +546,7 @@ TEST("partialUpdateTest") { ASSERT_TRUE( ca2.map().size() == 0); CallBackUpdate ca3; - TransLogClient::Visitor::UP visitor3 = tls.createVisitor("test1", ca3); + VisitorUP visitor3 = tls.createVisitor("test1", ca3); ASSERT_TRUE(visitor3.get()); ASSERT_TRUE( visitor3->visit(5, 1000) ); for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -562,7 +570,7 @@ TEST("testRemove") { vespalib::string name("test-delete"); createDomainTest(tls, name); - TransLogClient::Session::UP s1 = openDomainTest(tls, name); + SessionUP s1 = openDomainTest(tls, name); fillDomainTest(s1.get(), name); visitDomainTest(tls, s1.get(), name); ASSERT_TRUE(tls.remove(name)); @@ -577,7 +585,7 @@ assertVisitStats(TransLogClient &tls, const vespalib::string &domain, uint64_t expCount, uint64_t expInOrder) { CallBackStatsTest ca; - TransLogClient::Visitor::UP visitor = tls.createVisitor(domain, ca); + VisitorUP visitor = tls.createVisitor(domain, ca); ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(visitStart, visitEnd) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { @@ -591,9 +599,7 @@ assertVisitStats(TransLogClient &tls, const vespalib::string &domain, } void -assertStatus(TransLogClient::Session &s, - SerialNum expFirstSerial, SerialNum expLastSerial, - uint64_t expCount) +assertStatus(Session &s, SerialNum expFirstSerial, SerialNum expLastSerial, uint64_t expCount) { SerialNum b(0), e(0); size_t c(0); @@ -618,7 +624,7 @@ TEST("test sending a lot of data") { TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, MANY, 0); - TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); + SessionUP s1 = openDomainTest(tls, MANY); fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES); SerialNum b(0), e(0); size_t c(0); @@ -627,7 +633,7 @@ TEST("test sending a lot of data") { EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor("many", ca); + VisitorUP visitor = tls.createVisitor("many", ca); ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -640,7 +646,7 @@ TEST("test sending a lot of data") { TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); - TransLogClient::Session::UP s1 = openDomainTest(tls, "many"); + SessionUP s1 = openDomainTest(tls, "many"); SerialNum b(0), e(0); size_t c(0); EXPECT_TRUE(s1->status(b, e, c)); @@ -648,7 +654,7 @@ TEST("test sending a lot of data") { EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); + VisitorUP visitor = tls.createVisitor(MANY, ca); ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -661,7 +667,7 @@ TEST("test sending a lot of data") { TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); - TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); + SessionUP s1 = openDomainTest(tls, MANY); SerialNum b(0), e(0); size_t c(0); EXPECT_TRUE(s1->status(b, e, c)); @@ -669,7 +675,7 @@ TEST("test sending a lot of data") { EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); + VisitorUP visitor = tls.createVisitor(MANY, ca); ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -690,7 +696,7 @@ TEST("test sending a lot of data async") { .setChunkAgeLimit(10ms)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, MANY, 1); - TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); + SessionUP s1 = openDomainTest(tls, MANY); fillDomainTest(tlss, MANY, NUM_PACKETS, NUM_ENTRIES); SerialNum b(0), e(0); size_t c(0); @@ -699,7 +705,7 @@ TEST("test sending a lot of data async") { EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); + VisitorUP visitor = tls.createVisitor(MANY, ca); ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -712,7 +718,7 @@ TEST("test sending a lot of data async") { TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); - TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); + SessionUP s1 = openDomainTest(tls, MANY); SerialNum b(0), e(0); size_t c(0); EXPECT_TRUE(s1->status(b, e, c)); @@ -720,7 +726,7 @@ TEST("test sending a lot of data async") { EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); + VisitorUP visitor = tls.createVisitor(MANY, ca); ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -743,7 +749,7 @@ TEST("testErase") { TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "erase", 0); - TransLogClient::Session::UP s1 = openDomainTest(tls, "erase"); + SessionUP s1 = openDomainTest(tls, "erase"); fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES); } { @@ -751,7 +757,7 @@ TEST("testErase") { TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); - TransLogClient::Session::UP s1 = openDomainTest(tls, "erase"); + SessionUP s1 = openDomainTest(tls, "erase"); // Before erase TEST_DO(assertVisitStats(tls, "erase", 2, TOTAL_NUM_ENTRIES, @@ -839,7 +845,7 @@ TEST("testSync") { TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "sync", 0); - TransLogClient::Session::UP s1 = openDomainTest(tls, "sync"); + SessionUP s1 = openDomainTest(tls, "sync"); fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES); SerialNum syncedTo(0); @@ -861,7 +867,7 @@ TEST("test truncate on version mismatch") { TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "sync", 0); - TransLogClient::Session::UP s1 = openDomainTest(tls, "sync"); + SessionUP s1 = openDomainTest(tls, "sync"); fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES); EXPECT_TRUE(s1->status(fromOld, toOld, countOld)); SerialNum syncedTo(0); @@ -880,7 +886,7 @@ TEST("test truncate on version mismatch") { { TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); - TransLogClient::Session::UP s1 = openDomainTest(tls, "sync"); + SessionUP s1 = openDomainTest(tls, "sync"); uint64_t from(0), to(0); size_t count(0); EXPECT_TRUE(s1->status(from, to, count)); @@ -906,7 +912,7 @@ TEST("test truncation after short read") { TransLogClient tls(tlsspec); createDomainTest(tls, domain, 0); - TransLogClient::Session::UP s1 = openDomainTest(tls, domain); + SessionUP s1 = openDomainTest(tls, domain); fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES, ENTRYSIZE); SerialNum syncedTo(0); @@ -920,7 +926,7 @@ TEST("test truncation after short read") { { TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls(tlsspec); - TransLogClient::Session::UP s1 = openDomainTest(tls, domain); + SessionUP s1 = openDomainTest(tls, domain); checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES); } { @@ -929,14 +935,14 @@ TEST("test truncation after short read") { { vespalib::string filename(dir + "/truncate-0000000000000017"); FastOS_File trfile(filename.c_str()); - EXPECT_TRUE(trfile.OpenReadWrite(NULL)); + EXPECT_TRUE(trfile.OpenReadWrite(nullptr)); trfile.SetSize(trfile.getSize() - 1); trfile.Close(); } { TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls(tlsspec); - TransLogClient::Session::UP s1 = openDomainTest(tls, domain); + SessionUP s1 = openDomainTest(tls, domain); checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES - 1); } { diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp index 925f297bf48..a516fb26604 100644 --- a/searchlib/src/tests/transactionlogstress/translogstress.cpp +++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp @@ -27,8 +27,10 @@ using search::index::DummyFileHeaderContext; namespace search::transactionlog { -using ClientSession = TransLogClient::Session; -using Visitor = TransLogClient::Visitor; +using ClientSession = client::Session; +using client::Visitor; +using client::TransLogClient; +using client::RPC; //----------------------------------------------------------------------------- // BufferGenerator @@ -287,7 +289,7 @@ FeederThread::doRun() //----------------------------------------------------------------------------- // Agent //----------------------------------------------------------------------------- -class Agent : public ClientSession::Callback +class Agent : public client::Callback { protected: std::string _tlsSpec; @@ -301,12 +303,13 @@ protected: public: Agent(const std::string & tlsSpec, const std::string & domain, const EntryGenerator & generator, const std::string & name, uint32_t id, bool validate) : - ClientSession::Callback(), + client::Callback(), _tlsSpec(tlsSpec), _domain(domain), _client(tlsSpec), - _generator(generator), _name(name), _id(id), _validate(validate) {} - virtual ~Agent() {} - virtual RPC::Result receive(const Packet & packet) override = 0; - virtual void eof() override {} + _generator(generator), _name(name), _id(id), _validate(validate) + {} + ~Agent() override {} + RPC::Result receive(const Packet & packet) override = 0; + void eof() override {} virtual void failed() {} }; diff --git a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt index 5dca84a26c1..6ce7e652326 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt @@ -2,6 +2,7 @@ vespa_add_library(searchlib_transactionlog OBJECT SOURCES chunks.cpp + client_session.cpp common.cpp domain.cpp domainconfig.cpp diff --git a/searchlib/src/vespa/searchlib/transactionlog/client_common.h b/searchlib/src/vespa/searchlib/transactionlog/client_common.h new file mode 100644 index 00000000000..05bb30ff368 --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/client_common.h @@ -0,0 +1,20 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +namespace search::transactionlog { class Packet; } +namespace search::transactionlog::client { + +class RPC +{ +public: +enum Result { OK, FULL, ERROR }; +}; + +class Callback { +public: + virtual ~Callback() = default; + virtual RPC::Result receive(const Packet & packet) = 0; + virtual void eof() { } +}; + +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp b/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp new file mode 100644 index 00000000000..8678d88b43c --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp @@ -0,0 +1,200 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "client_session.h" +#include "translogclient.h" +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/fnet/frt/supervisor.h> +#include <thread> + +#include <vespa/log/log.h> +LOG_SETUP(".translog.client_session"); + +using vespalib::LockGuard; +using namespace std::chrono_literals; + +namespace search::transactionlog::client { + +SessionKey::SessionKey(const vespalib::string & domain, int sessionId) + : _domain(domain), + _sessionId(sessionId) +{ } +SessionKey::~SessionKey() = default; + +int +SessionKey::cmp(const SessionKey & b) const +{ + int diff(strcmp(_domain.c_str(), b._domain.c_str())); + if (diff == 0) { + diff = _sessionId - b._sessionId; + } + return diff; +} + +Session::Session(const vespalib::string & domain, TransLogClient & tlc) + : _tlc(tlc), + _domain(domain), + _sessionId(0) +{ +} + +Session::~Session() +{ + close(); + clear(); +} + +bool +Session::commit(const vespalib::ConstBufferRef & buf) +{ + bool retval(true); + if (buf.size() != 0) { + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); + req->SetMethodName("domainCommit"); + req->GetParams()->AddString(_domain.c_str()); + req->GetParams()->AddData(buf.c_str(), buf.size()); + int retcode = _tlc.rpc(req); + retval = (retcode == 0); + if (retval) { + req->SubRef(); + } else { + vespalib::string msg; + if (req->GetReturn() != nullptr) { + msg = req->GetReturn()->GetValue(1)._string._str; + } else { + msg = vespalib::make_string("Clientside error %s: error(%d): %s", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage()); + } + req->SubRef(); + throw std::runtime_error(vespalib::make_string("commit failed with code %d. server says: %s", retcode, msg.c_str())); + } + } + return retval; +} + +bool +Session::status(SerialNum & b, SerialNum & e, size_t & count) +{ + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); + req->SetMethodName("domainStatus"); + req->GetParams()->AddString(_domain.c_str()); + int32_t retval(_tlc.rpc(req)); + if (retval == 0) { + b = req->GetReturn()->GetValue(1)._intval64; + e = req->GetReturn()->GetValue(2)._intval64; + count = req->GetReturn()->GetValue(3)._intval64; + } + req->SubRef(); + return (retval == 0); +} + +bool +Session::erase(const SerialNum & to) +{ + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); + req->SetMethodName("domainPrune"); + req->GetParams()->AddString(_domain.c_str()); + req->GetParams()->AddInt64(to); + int32_t retval(_tlc.rpc(req)); + req->SubRef(); + if (retval == 1) { + LOG(warning, "Prune to %" PRIu64 " denied since there were active visitors in that area", to); + } + return (retval == 0); +} + + +bool +Session::sync(const SerialNum &syncTo, SerialNum &syncedTo) +{ + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); + req->SetMethodName("domainSync"); + FRT_Values & params = *req->GetParams(); + params.AddString(_domain.c_str()); + params.AddInt64(syncTo); + int32_t retval(_tlc.rpc(req)); + if (retval == 0) { + syncedTo = req->GetReturn()->GetValue(1)._intval64; + } + req->SubRef(); + return (retval == 0); +} + + +void +Session::clear() +{ + if (_sessionId > 0) { + LockGuard guard(_tlc._lock); + _tlc._sessions.erase(SessionKey(_domain, _sessionId)); + } + _sessionId = 0; +} + +Visitor::Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack) : + Session(domain, tlc), + _callback(callBack) +{ +} + +bool +Session::init(FRT_RPCRequest *req) +{ + int32_t retval(_tlc.rpc(req)); + req->SubRef(); + if (retval > 0) { + clear(); + _sessionId = retval; + SessionKey key(_domain, _sessionId); + { + LockGuard guard(_tlc._lock); + _tlc._sessions[key] = this; + } + retval = run(); + } + return (retval > 0); +} + +bool +Visitor::visit(const SerialNum & from, const SerialNum & to) +{ + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); + req->SetMethodName("domainVisit"); + req->GetParams()->AddString(_domain.c_str()); + req->GetParams()->AddInt64(from); + req->GetParams()->AddInt64(to); + return init(req); +} + +bool +Session::run() +{ + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); + req->SetMethodName("domainSessionRun"); + req->GetParams()->AddString(_domain.c_str()); + req->GetParams()->AddInt32(_sessionId); + int32_t retval(_tlc.rpc(req)); + req->SubRef(); + return (retval == 0); +} + +bool +Session::close() +{ + int retval(0); + if (_sessionId > 0) { + do { + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); + req->SetMethodName("domainSessionClose"); + req->GetParams()->AddString(_domain.c_str()); + req->GetParams()->AddInt32(_sessionId); + if ( (retval = _tlc.rpc(req)) > 0) { + std::this_thread::sleep_for(10ms); + } + req->SubRef(); + } while ( retval == 1 ); + } + return (retval == 0); +} + +Visitor::~Visitor() = default; + +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/client_session.h b/searchlib/src/vespa/searchlib/transactionlog/client_session.h new file mode 100644 index 00000000000..49f24d83aaf --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/client_session.h @@ -0,0 +1,68 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "client_common.h" +#include <vespa/searchlib/common/serialnum.h> +#include <vespa/vespalib/util/buffer.h> +#include <vespa/vespalib/stllike/string.h> + +class FRT_RPCRequest; + +namespace search::transactionlog::client { + +class TransLogClient; + +class SessionKey +{ +public: + SessionKey(const vespalib::string & domain, int sessionId); + ~SessionKey(); + bool operator < (const SessionKey & b) const { return cmp(b) < 0; } +private: + int cmp(const SessionKey & b) const; + vespalib::string _domain; + int _sessionId; +}; + +class Session +{ +public: + Session(const vespalib::string & domain, TransLogClient & tlc); + virtual ~Session(); + /// You can commit data of any registered type to any channel. + bool commit(const vespalib::ConstBufferRef & packet); + /// Will erase all entries prior to <to> + bool erase(const SerialNum & to); + bool status(SerialNum & b, SerialNum & e, size_t & count); + + bool sync(const SerialNum &syncTo, SerialNum &syncedTo); + + virtual RPC::Result visit(const Packet & ) { return RPC::OK; } + virtual void eof() { } + bool close(); + void clear(); + const vespalib::string & getDomain() const { return _domain; } + const TransLogClient & getTLC() const { return _tlc; } +protected: + bool init(FRT_RPCRequest * req); + bool run(); + TransLogClient & _tlc; + vespalib::string _domain; + int _sessionId; +}; + +/// Here you connect to the incomming data getting everything from <from> +class Visitor : public Session +{ +public: + Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack); + bool visit(const SerialNum & from, const SerialNum & to); + virtual ~Visitor() override; + RPC::Result visit(const Packet & packet) override { return _callback.receive(packet); } + void eof() override { _callback.eof(); } +private: + Callback & _callback; +}; + +} + diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h index c5427c5b401..8dba5d448f8 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.h +++ b/searchlib/src/vespa/searchlib/transactionlog/common.h @@ -10,14 +10,7 @@ namespace search::transactionlog { /// This represents a type of the entry. Fx update,remove -typedef uint32_t Type; -/// A channel represents one data stream. - -class RPC -{ -public: -enum Result { OK, FULL, ERROR }; -}; +using Type = uint32_t; class SerialNumRange { diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp index 2c6c1e249f4..84919a59a97 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp @@ -1,12 +1,14 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "translogclient.h" +#include "common.h" #include <vespa/vespalib/util/stringfmt.h> #include <vespa/fnet/frt/supervisor.h> #include <vespa/fnet/frt/target.h> #include <vespa/fnet/frt/rpcrequest.h> #include <vespa/fnet/transport.h> +#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/fastos/thread.h> -#include <thread> + #include <vespa/log/log.h> LOG_SETUP(".translogclient"); @@ -15,7 +17,7 @@ using namespace std::chrono_literals; VESPA_THREAD_STACK_TAG(translogclient_rpc_callback) -namespace search::transactionlog { +namespace search::transactionlog::client { namespace { const double NEVER(-1.0); @@ -33,7 +35,7 @@ struct RpcTask : public vespalib::Executor::Task { req->Return(); req = nullptr; } - ~RpcTask() { + ~RpcTask() override { if (req != nullptr) { req->SetError(FRTE_RPC_METHOD_FAILED, "client has been shut down"); req->Return(); @@ -46,13 +48,13 @@ struct RpcTask : public vespalib::Executor::Task { using vespalib::LockGuard; TransLogClient::TransLogClient(const vespalib::string & rpcTarget) : - _executor(1, 128 * 1024, translogclient_rpc_callback), + _executor(std::make_unique<vespalib::ThreadStackExecutor>(1, 128 * 1024, translogclient_rpc_callback)), _rpcTarget(rpcTarget), _sessions(), _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)), _transport(std::make_unique<FNET_Transport>()), _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())), - _target(NULL) + _target(nullptr) { reconnect(); exportRPC(*_supervisor); @@ -62,29 +64,33 @@ TransLogClient::TransLogClient(const vespalib::string & rpcTarget) : TransLogClient::~TransLogClient() { disconnect(); - _executor.shutdown().sync(); + _executor->shutdown().sync(); _transport->ShutDown(true); } -bool TransLogClient::reconnect() +bool +TransLogClient::reconnect() { disconnect(); _target = _supervisor->Get2WayTarget(_rpcTarget.c_str()); return isConnected(); } -bool TransLogClient::isConnected() const { - return (_target != NULL) && _target->IsValid(); +bool +TransLogClient::isConnected() const { + return (_target != nullptr) && _target->IsValid(); } -void TransLogClient::disconnect() +void +TransLogClient::disconnect() { if (_target) { _target->SubRef(); } } -bool TransLogClient::create(const vespalib::string & domain) +bool +TransLogClient::create(const vespalib::string & domain) { FRT_RPCRequest *req = _supervisor->AllocRPCRequest(); req->SetMethodName("createDomain"); @@ -94,7 +100,8 @@ bool TransLogClient::create(const vespalib::string & domain) return (retval == 0); } -bool TransLogClient::remove(const vespalib::string & domain) +bool +TransLogClient::remove(const vespalib::string & domain) { FRT_RPCRequest *req = _supervisor->AllocRPCRequest(); req->SetMethodName("deleteDomain"); @@ -104,27 +111,28 @@ bool TransLogClient::remove(const vespalib::string & domain) return (retval == 0); } -TransLogClient::Session::UP TransLogClient::open(const vespalib::string & domain) +std::unique_ptr<Session> +TransLogClient::open(const vespalib::string & domain) { - Session::UP session; FRT_RPCRequest *req = _supervisor->AllocRPCRequest(); req->SetMethodName("openDomain"); req->GetParams()->AddString(domain.c_str()); int32_t retval(rpc(req)); + req->SubRef(); if (retval == 0) { - session.reset(new Session(domain, *this)); + return std::make_unique<Session>(domain, *this); } - req->SubRef(); - return session; + return std::unique_ptr<Session>(); } -TransLogClient::Visitor::UP -TransLogClient::createVisitor(const vespalib::string & domain, TransLogClient::Session::Callback & callBack) +std::unique_ptr<Visitor> +TransLogClient::createVisitor(const vespalib::string & domain, Callback & callBack) { - return TransLogClient::Visitor::UP(new Visitor(domain, *this, callBack)); + return std::make_unique<Visitor>(domain, *this, callBack); } -bool TransLogClient::listDomains(std::vector<vespalib::string> & dir) +bool +TransLogClient::listDomains(std::vector<vespalib::string> & dir) { FRT_RPCRequest *req = _supervisor->AllocRPCRequest(); req->SetMethodName("listDomains"); @@ -139,7 +147,8 @@ bool TransLogClient::listDomains(std::vector<vespalib::string> & dir) return (retval == 0); } -int32_t TransLogClient::rpc(FRT_RPCRequest * req) +int32_t +TransLogClient::rpc(FRT_RPCRequest * req) { int32_t retval(-7); if (_target) { @@ -156,15 +165,17 @@ int32_t TransLogClient::rpc(FRT_RPCRequest * req) return retval; } -TransLogClient::Session * TransLogClient::findSession(const vespalib::string & domainName, int sessionId) +Session * +TransLogClient::findSession(const vespalib::string & domainName, int sessionId) { SessionKey key(domainName, sessionId); SessionMap::iterator found(_sessions.find(key)); - Session * session((found != _sessions.end()) ? found->second : NULL); + Session * session((found != _sessions.end()) ? found->second : nullptr); return session; } -void TransLogClient::exportRPC(FRT_Supervisor & supervisor) +void +TransLogClient::exportRPC(FRT_Supervisor & supervisor) { FRT_ReflectionBuilder rb( & supervisor); @@ -185,7 +196,8 @@ void TransLogClient::exportRPC(FRT_Supervisor & supervisor) } -void TransLogClient::do_visitCallbackRPC(FRT_RPCRequest *req) +void +TransLogClient::do_visitCallbackRPC(FRT_RPCRequest *req) { uint32_t retval(uint32_t(-1)); FRT_Values & params = *req->GetParams(); @@ -194,7 +206,7 @@ void TransLogClient::do_visitCallbackRPC(FRT_RPCRequest *req) int32_t sessionId(params[1]._intval32); LOG(spam, "visitCallback(%s, %d)(%d)", domainName, sessionId, params[2]._data._len); Session * session(findSession(domainName, sessionId)); - if (session != NULL) { + if (session != nullptr) { Packet packet(params[2]._data._buf, params[2]._data._len); retval = session->visit(packet); } @@ -202,7 +214,8 @@ void TransLogClient::do_visitCallbackRPC(FRT_RPCRequest *req) LOG(debug, "visitCallback(%s, %d)=%d done", domainName, sessionId, retval); } -void TransLogClient::do_eofCallbackRPC(FRT_RPCRequest *req) +void +TransLogClient::do_eofCallbackRPC(FRT_RPCRequest *req) { uint32_t retval(uint32_t(-1)); FRT_Values & params = *req->GetParams(); @@ -211,7 +224,7 @@ void TransLogClient::do_eofCallbackRPC(FRT_RPCRequest *req) int32_t sessionId(params[1]._intval32); LOG(debug, "eofCallback(%s, %d)", domainName, sessionId); Session * session(findSession(domainName, sessionId)); - if (session != NULL) { + if (session != nullptr) { session->eof(); retval = 0; } @@ -219,183 +232,16 @@ void TransLogClient::do_eofCallbackRPC(FRT_RPCRequest *req) LOG(debug, "eofCallback(%s, %d)=%d done", domainName, sessionId, retval); } -void TransLogClient::visitCallbackRPC_hook(FRT_RPCRequest *req) -{ - _executor.execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_visitCallbackRPC(x); })); -} - -void TransLogClient::eofCallbackRPC_hook(FRT_RPCRequest *req) -{ - _executor.execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_eofCallbackRPC(x); })); -} - - -TransLogClient::Session::Session(const vespalib::string & domain, TransLogClient & tlc) : - _tlc(tlc), - _domain(domain), - _sessionId(0) -{ -} - -TransLogClient::Session::~Session() -{ - close(); - clear(); -} - -bool TransLogClient::Session::commit(const vespalib::ConstBufferRef & buf) -{ - bool retval(true); - if (buf.size() != 0) { - FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); - req->SetMethodName("domainCommit"); - req->GetParams()->AddString(_domain.c_str()); - req->GetParams()->AddData(buf.c_str(), buf.size()); - int retcode = _tlc.rpc(req); - retval = (retcode == 0); - if (retval) { - req->SubRef(); - } else { - vespalib::string msg; - if (req->GetReturn() != 0) { - msg = req->GetReturn()->GetValue(1)._string._str; - } else { - msg = vespalib::make_string("Clientside error %s: error(%d): %s", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage()); - } - req->SubRef(); - throw std::runtime_error(vespalib::make_string("commit failed with code %d. server says: %s", retcode, msg.c_str())); - } - } - return retval; -} - -bool TransLogClient::Session::status(SerialNum & b, SerialNum & e, size_t & count) -{ - FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); - req->SetMethodName("domainStatus"); - req->GetParams()->AddString(_domain.c_str()); - int32_t retval(_tlc.rpc(req)); - if (retval == 0) { - b = req->GetReturn()->GetValue(1)._intval64; - e = req->GetReturn()->GetValue(2)._intval64; - count = req->GetReturn()->GetValue(3)._intval64; - } - req->SubRef(); - return (retval == 0); -} - -bool TransLogClient::Session::erase(const SerialNum & to) -{ - FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); - req->SetMethodName("domainPrune"); - req->GetParams()->AddString(_domain.c_str()); - req->GetParams()->AddInt64(to); - int32_t retval(_tlc.rpc(req)); - req->SubRef(); - if (retval == 1) { - LOG(warning, "Prune to %" PRIu64 " denied since there were active visitors in that area", to); - } - return (retval == 0); -} - - -bool -TransLogClient::Session::sync(const SerialNum &syncTo, SerialNum &syncedTo) -{ - FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); - req->SetMethodName("domainSync"); - FRT_Values & params = *req->GetParams(); - params.AddString(_domain.c_str()); - params.AddInt64(syncTo); - int32_t retval(_tlc.rpc(req)); - if (retval == 0) { - syncedTo = req->GetReturn()->GetValue(1)._intval64; - } - req->SubRef(); - return (retval == 0); -} - - -void TransLogClient::Session::clear() -{ - if (_sessionId > 0) { - LockGuard guard(_tlc._lock); - _tlc._sessions.erase(SessionKey(_domain, _sessionId)); - } - _sessionId = 0; -} - -int TransLogClient::SessionKey::cmp(const TransLogClient::SessionKey & b) const +void +TransLogClient::visitCallbackRPC_hook(FRT_RPCRequest *req) { - int diff(strcmp(_domain.c_str(), b._domain.c_str())); - if (diff == 0) { - diff = _sessionId - b._sessionId; - } - return diff; + _executor->execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_visitCallbackRPC(x); })); } -TransLogClient::Visitor::Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack) : - Session(domain, tlc), - _callback(callBack) +void +TransLogClient::eofCallbackRPC_hook(FRT_RPCRequest *req) { + _executor->execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_eofCallbackRPC(x); })); } -bool TransLogClient::Session::init(FRT_RPCRequest *req) -{ - int32_t retval(_tlc.rpc(req)); - req->SubRef(); - if (retval > 0) { - clear(); - _sessionId = retval; - SessionKey key(_domain, _sessionId); - { - LockGuard guard(_tlc._lock); - _tlc._sessions[key] = this; - } - retval = run(); - } - return (retval > 0); -} - -bool TransLogClient::Visitor::visit(const SerialNum & from, const SerialNum & to) -{ - FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); - req->SetMethodName("domainVisit"); - req->GetParams()->AddString(_domain.c_str()); - req->GetParams()->AddInt64(from); - req->GetParams()->AddInt64(to); - return init(req); -} - -bool TransLogClient::Session::run() -{ - FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); - req->SetMethodName("domainSessionRun"); - req->GetParams()->AddString(_domain.c_str()); - req->GetParams()->AddInt32(_sessionId); - int32_t retval(_tlc.rpc(req)); - req->SubRef(); - return (retval == 0); -} - -bool TransLogClient::Session::close() -{ - int retval(0); - if (_sessionId > 0) { - do { - FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); - req->SetMethodName("domainSessionClose"); - req->GetParams()->AddString(_domain.c_str()); - req->GetParams()->AddInt32(_sessionId); - if ( (retval = _tlc.rpc(req)) > 0) { - std::this_thread::sleep_for(10ms); - } - req->SubRef(); - } while ( retval == 1 ); - } - return (retval == 0); -} - -TransLogClient::Visitor::~Visitor() = default; - } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h index 38c30cd5b4c..289a0fcb8c0 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h @@ -1,11 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "common.h" -#include <vespa/document/util/bytebuffer.h> +#include "client_common.h" +#include "client_session.h" #include <vespa/vespalib/util/sync.h> -#include <vespa/vespalib/util/buffer.h> -#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/fnet/frt/invokable.h> #include <map> #include <vector> @@ -13,90 +11,39 @@ class FNET_Transport; class FRT_Supervisor; class FRT_Target; +class FastOS_ThreadPool; -namespace search::transactionlog { +namespace vespalib { class ThreadStackExecutorBase; } +namespace search::transactionlog::client { + +class Session; +class Visitor; class TransLogClient : private FRT_Invokable { -private: - TransLogClient(const TransLogClient &); - TransLogClient& operator=(const TransLogClient &); - public: - class Session - { - public: - class Callback { - public: - virtual ~Callback() { } - virtual RPC::Result receive(const Packet & packet) = 0; - virtual void eof() { } - }; - public: - typedef std::unique_ptr<Session> UP; - typedef std::shared_ptr<Session> SP; - - Session(const vespalib::string & domain, TransLogClient & tlc); - virtual ~Session(); - /// You can commit data of any registered type to any channel. - bool commit(const vespalib::ConstBufferRef & packet); - /// Will erase all entries prior to <to> - bool erase(const SerialNum & to); - bool status(SerialNum & b, SerialNum & e, size_t & count); - - bool sync(const SerialNum &syncTo, SerialNum &syncedTo); - - virtual RPC::Result visit(const Packet & ) { return RPC::OK; } - virtual void eof() { } - bool close(); - void clear(); - const vespalib::string & getDomain() const { return _domain; } - const TransLogClient & getTLC() const { return _tlc; } - protected: - bool init(FRT_RPCRequest * req); - bool run(); - TransLogClient & _tlc; - vespalib::string _domain; - int _sessionId; - }; - /// Here you connect to the incomming data getting everything from <from> - class Visitor : public Session - { - public: - typedef std::unique_ptr<Visitor> UP; - typedef std::shared_ptr<Visitor> SP; - - Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack); - bool visit(const SerialNum & from, const SerialNum & to); - virtual ~Visitor(); - RPC::Result visit(const Packet & packet) override { return _callback.receive(packet); } - void eof() override { _callback.eof(); } - private: - Callback & _callback; - }; - /// Here you read the incomming data getting everything from <from> - -public: - typedef std::unique_ptr<TransLogClient> UP; - TransLogClient(const vespalib::string & rpctarget); - virtual ~TransLogClient(); + TransLogClient(const TransLogClient &) = delete; + TransLogClient& operator=(const TransLogClient &) = delete; + ~TransLogClient() override; /// Here you create a new domain bool create(const vespalib::string & domain); /// Here you remove a domain bool remove(const vespalib::string & domain); /// Here you open an existing domain - Session::UP open(const vespalib::string & domain); + std::unique_ptr<Session> open(const vespalib::string & domain); /// Here you can get a list of available domains. bool listDomains(std::vector<vespalib::string> & dir); - Visitor::UP createVisitor(const vespalib::string & domain, Session::Callback & callBack); + std::unique_ptr<Visitor> createVisitor(const vespalib::string & domain, Callback & callBack); bool isConnected() const; void disconnect(); bool reconnect(); const vespalib::string &getRPCTarget() const { return _rpcTarget; } private: + friend Session; + friend Visitor; void exportRPC(FRT_Supervisor & supervisor); void do_visitCallbackRPC(FRT_RPCRequest *req); void do_eofCallbackRPC(FRT_RPCRequest *req); @@ -105,22 +52,11 @@ private: int32_t rpc(FRT_RPCRequest * req); Session * findSession(const vespalib::string & domain, int sessionId); - class SessionKey - { - public: - SessionKey(const vespalib::string & domain, int sessionId) : _domain(domain), _sessionId(sessionId) { } - bool operator < (const SessionKey & b) const { return cmp(b) < 0; } - private: - int cmp(const SessionKey & b) const; - vespalib::string _domain; - int _sessionId; - }; - - typedef std::map< SessionKey, Session * > SessionMap; + using SessionMap = std::map< SessionKey, Session * >; - vespalib::ThreadStackExecutor _executor; - vespalib::string _rpcTarget; - SessionMap _sessions; + std::unique_ptr<vespalib::ThreadStackExecutorBase> _executor; + vespalib::string _rpcTarget; + SessionMap _sessions; //Brute force lock for subscriptions. For multithread safety. vespalib::Lock _lock; std::unique_ptr<FastOS_ThreadPool> _threadPool; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index a72731b661e..64e472520a5 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "translogserver.h" #include "domain.h" +#include "client_common.h" #include <vespa/searchlib/common/gatecallback.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/io/fileutil.h> @@ -399,12 +400,12 @@ public: private: bool send(FRT_RPCRequest * req) { int32_t retval = rpc(req); - if ( ! ((retval == RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) { + if ( ! ((retval == client::RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) { LOG(error, "Return value != OK(%d) in send for method 'visitCallback'.", retval); } req->SubRef(); - return (retval == RPC::OK); + return (retval == client::RPC::OK); } int32_t rpc(FRT_RPCRequest * req) { int32_t retval(-7); |