aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-09-15 11:32:16 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-09-15 11:33:09 +0000
commitaf3f5cd865a24507271c1525c61fc97119a1ce83 (patch)
treee20169f10c1a2c1ecd6e655e309f8d9db403c58a /searchlib
parentf500f24745747137bad942c0c8c40db56be6c49e (diff)
Decouple code and reduce code visibility.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/tests/transactionlog/translogclient_test.cpp116
-rw-r--r--searchlib/src/tests/transactionlogstress/translogstress.cpp19
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt1
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/client_common.h20
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/client_session.cpp200
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/client_session.h68
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/common.h9
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp250
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogclient.h102
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp5
10 files changed, 432 insertions, 358 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp
index a20e0cc3aaa..e6cf44cb697 100644
--- a/searchlib/src/tests/transactionlog/translogclient_test.cpp
+++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp
@@ -4,6 +4,7 @@
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/objects/identifiable.h>
#include <vespa/searchlib/index/dummyfileheadercontext.h>
+#include <vespa/document/util/bytebuffer.h>
#include <vespa/fastos/file.h>
#include <vespa/log/log.h>
@@ -15,17 +16,24 @@ using namespace document;
using namespace vespalib;
using namespace std::chrono_literals;
using search::index::DummyFileHeaderContext;
+using search::transactionlog::client::TransLogClient;
+using search::transactionlog::client::Session;
+using search::transactionlog::client::Visitor;
+using search::transactionlog::client::RPC;
+using search::transactionlog::client::Callback;
+using SessionUP = std::unique_ptr<Session>;
+using VisitorUP = std::unique_ptr<Visitor>;
namespace {
bool createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains=0);
-TransLogClient::Session::UP openDomainTest(TransLogClient & tls, const vespalib::string & name);
-bool fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name);
-void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries);
-void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize);
+SessionUP openDomainTest(TransLogClient & tls, const vespalib::string & name);
+bool fillDomainTest(Session * s1, const vespalib::string & name);
+void fillDomainTest(Session * s1, size_t numPackets, size_t numEntries);
+void fillDomainTest(Session * s1, size_t numPackets, size_t numEntries, size_t entrySize);
uint32_t countFiles(const vespalib::string &dir);
-void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries);
-bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name);
+void checkFilledDomainTest(const SessionUP &s1, size_t numEntries);
+bool visitDomainTest(TransLogClient & tls, Session * s1, const vespalib::string & name);
void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains);
void verifyDomain(const vespalib::string & name);
@@ -43,7 +51,7 @@ myhex(const void * b, size_t sz)
return s;
}
-class CallBackTest : public TransLogClient::Visitor::Callback
+class CallBackTest : public Callback
{
private:
virtual RPC::Result receive(const Packet & packet) override;
@@ -74,7 +82,7 @@ CallBackTest::receive(const Packet & p)
return RPC::OK;
}
-class CallBackManyTest : public TransLogClient::Visitor::Callback
+class CallBackManyTest : public Callback
{
private:
virtual RPC::Result receive(const Packet & packet) override;
@@ -103,7 +111,7 @@ CallBackManyTest::receive(const Packet & p)
return RPC::OK;
}
-class CallBackUpdate : public TransLogClient::Visitor::Callback
+class CallBackUpdate : public Callback
{
public:
typedef std::map<SerialNum, Identifiable *> PacketMap;
@@ -153,7 +161,7 @@ CallBackUpdate::receive(const Packet & packet)
return RPC::OK;
}
-class CallBackStatsTest : public TransLogClient::Session::Callback
+class CallBackStatsTest : public Callback
{
private:
virtual RPC::Result receive(const Packet & packet) override;
@@ -219,8 +227,8 @@ createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t pre
std::vector<vespalib::string> dir;
tls.listDomains(dir);
EXPECT_EQUAL (dir.size(), preExistingDomains);
- TransLogClient::Session::UP s1 = tls.open(name);
- ASSERT_TRUE (s1.get() == NULL);
+ SessionUP s1 = tls.open(name);
+ ASSERT_FALSE (s1);
retval = tls.create(name);
ASSERT_TRUE (retval);
dir.clear();
@@ -230,16 +238,16 @@ createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t pre
return retval;
}
-TransLogClient::Session::UP
+SessionUP
openDomainTest(TransLogClient & tls, const vespalib::string & name)
{
- TransLogClient::Session::UP s1 = tls.open(name);
- ASSERT_TRUE (s1.get() != NULL);
+ SessionUP s1 = tls.open(name);
+ ASSERT_TRUE (s1);
return s1;
}
bool
-fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name)
+fillDomainTest(Session * s1, const vespalib::string & name)
{
bool retval(true);
Packet::Entry e1(1, 1, vespalib::ConstBufferRef("Content in buffer A", 20));
@@ -279,7 +287,7 @@ fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name)
}
void
-fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries)
+fillDomainTest(Session * s1, size_t numPackets, size_t numEntries)
{
size_t value(0);
for(size_t i=0; i < numPackets; i++) {
@@ -333,7 +341,7 @@ fillDomainTest(TransLogServer & s1, const vespalib::string & domain, size_t numP
void
-fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize)
+fillDomainTest(Session * s1, size_t numPackets, size_t numEntries, size_t entrySize)
{
size_t value(0);
std::vector<char> entryBuffer(entrySize);
@@ -368,7 +376,7 @@ countFiles(const vespalib::string &dir)
}
void
-checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries)
+checkFilledDomainTest(const SessionUP &s1, size_t numEntries)
{
SerialNum b(0), e(0);
size_t c(0);
@@ -379,7 +387,7 @@ checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries)
}
bool
-visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name)
+visitDomainTest(TransLogClient & tls, Session * s1, const vespalib::string & name)
{
bool retval(true);
@@ -391,7 +399,7 @@ visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespal
EXPECT_EQUAL(c, 3u);
CallBackTest ca;
- TransLogClient::Visitor::UP visitor = tls.createVisitor(name, ca);
+ VisitorUP visitor = tls.createVisitor(name, ca);
ASSERT_TRUE(visitor.get());
EXPECT_TRUE( visitor->visit(0, 1) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -451,7 +459,7 @@ void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, name, preExistingDomains);
- TransLogClient::Session::UP s1 = openDomainTest(tls, name);
+ SessionUP s1 = openDomainTest(tls, name);
fillDomainTest(s1.get(), name);
}
@@ -459,7 +467,7 @@ void verifyDomain(const vespalib::string & name) {
DummyFileHeaderContext fileHeaderContext;
TransLogServer tlss("test13", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
TransLogClient tls("tcp/localhost:18377");
- TransLogClient::Session::UP s1 = openDomainTest(tls, name);
+ SessionUP s1 = openDomainTest(tls, name);
visitDomainTest(tls, s1.get(), name);
}
@@ -472,7 +480,7 @@ TEST("testVisitOverGeneratedDomain") {
vespalib::string name("test1");
createDomainTest(tls, name);
- TransLogClient::Session::UP s1 = openDomainTest(tls, name);
+ SessionUP s1 = openDomainTest(tls, name);
fillDomainTest(s1.get(), name);
EXPECT_EQUAL(0, getMaxSessionRunTime(tlss, "test1"));
visitDomainTest(tls, s1.get(), name);
@@ -488,7 +496,7 @@ TEST("testVisitOverPreExistingDomain") {
TransLogClient tls("tcp/localhost:18377");
vespalib::string name("test1");
- TransLogClient::Session::UP s1 = openDomainTest(tls, name);
+ SessionUP s1 = openDomainTest(tls, name);
visitDomainTest(tls, s1.get(), name);
}
@@ -497,8 +505,8 @@ TEST("partialUpdateTest") {
TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls("tcp/localhost:18377");
- TransLogClient::Session::UP s1 = openDomainTest(tls, "test1");
- TransLogClient::Session & session = *s1;
+ SessionUP s1 = openDomainTest(tls, "test1");
+ Session & session = *s1;
TestIdentifiable du;
@@ -513,7 +521,7 @@ TEST("partialUpdateTest") {
ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().data(), pa.getHandle().size())));
CallBackUpdate ca;
- TransLogClient::Visitor::UP visitor = tls.createVisitor("test1", ca);
+ VisitorUP visitor = tls.createVisitor("test1", ca);
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(5, 7) );
for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -522,7 +530,7 @@ TEST("partialUpdateTest") {
ASSERT_TRUE( ca.hasSerial(7) );
CallBackUpdate ca1;
- TransLogClient::Visitor::UP visitor1 = tls.createVisitor("test1", ca1);
+ VisitorUP visitor1 = tls.createVisitor("test1", ca1);
ASSERT_TRUE(visitor1.get());
ASSERT_TRUE( visitor1->visit(4, 5) );
for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -530,7 +538,7 @@ TEST("partialUpdateTest") {
ASSERT_TRUE( ca1.map().size() == 0);
CallBackUpdate ca2;
- TransLogClient::Visitor::UP visitor2 = tls.createVisitor("test1", ca2);
+ VisitorUP visitor2 = tls.createVisitor("test1", ca2);
ASSERT_TRUE(visitor2.get());
ASSERT_TRUE( visitor2->visit(5, 6) );
for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -538,7 +546,7 @@ TEST("partialUpdateTest") {
ASSERT_TRUE( ca2.map().size() == 0);
CallBackUpdate ca3;
- TransLogClient::Visitor::UP visitor3 = tls.createVisitor("test1", ca3);
+ VisitorUP visitor3 = tls.createVisitor("test1", ca3);
ASSERT_TRUE(visitor3.get());
ASSERT_TRUE( visitor3->visit(5, 1000) );
for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -562,7 +570,7 @@ TEST("testRemove") {
vespalib::string name("test-delete");
createDomainTest(tls, name);
- TransLogClient::Session::UP s1 = openDomainTest(tls, name);
+ SessionUP s1 = openDomainTest(tls, name);
fillDomainTest(s1.get(), name);
visitDomainTest(tls, s1.get(), name);
ASSERT_TRUE(tls.remove(name));
@@ -577,7 +585,7 @@ assertVisitStats(TransLogClient &tls, const vespalib::string &domain,
uint64_t expCount, uint64_t expInOrder)
{
CallBackStatsTest ca;
- TransLogClient::Visitor::UP visitor = tls.createVisitor(domain, ca);
+ VisitorUP visitor = tls.createVisitor(domain, ca);
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(visitStart, visitEnd) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) {
@@ -591,9 +599,7 @@ assertVisitStats(TransLogClient &tls, const vespalib::string &domain,
}
void
-assertStatus(TransLogClient::Session &s,
- SerialNum expFirstSerial, SerialNum expLastSerial,
- uint64_t expCount)
+assertStatus(Session &s, SerialNum expFirstSerial, SerialNum expLastSerial, uint64_t expCount)
{
SerialNum b(0), e(0);
size_t c(0);
@@ -618,7 +624,7 @@ TEST("test sending a lot of data") {
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, MANY, 0);
- TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
+ SessionUP s1 = openDomainTest(tls, MANY);
fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES);
SerialNum b(0), e(0);
size_t c(0);
@@ -627,7 +633,7 @@ TEST("test sending a lot of data") {
EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
CallBackManyTest ca(2);
- TransLogClient::Visitor::UP visitor = tls.createVisitor("many", ca);
+ VisitorUP visitor = tls.createVisitor("many", ca);
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -640,7 +646,7 @@ TEST("test sending a lot of data") {
TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
TransLogClient tls("tcp/localhost:18377");
- TransLogClient::Session::UP s1 = openDomainTest(tls, "many");
+ SessionUP s1 = openDomainTest(tls, "many");
SerialNum b(0), e(0);
size_t c(0);
EXPECT_TRUE(s1->status(b, e, c));
@@ -648,7 +654,7 @@ TEST("test sending a lot of data") {
EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
CallBackManyTest ca(2);
- TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
+ VisitorUP visitor = tls.createVisitor(MANY, ca);
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -661,7 +667,7 @@ TEST("test sending a lot of data") {
TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
TransLogClient tls("tcp/localhost:18377");
- TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
+ SessionUP s1 = openDomainTest(tls, MANY);
SerialNum b(0), e(0);
size_t c(0);
EXPECT_TRUE(s1->status(b, e, c));
@@ -669,7 +675,7 @@ TEST("test sending a lot of data") {
EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
CallBackManyTest ca(2);
- TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
+ VisitorUP visitor = tls.createVisitor(MANY, ca);
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -690,7 +696,7 @@ TEST("test sending a lot of data async") {
.setChunkAgeLimit(10ms));
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, MANY, 1);
- TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
+ SessionUP s1 = openDomainTest(tls, MANY);
fillDomainTest(tlss, MANY, NUM_PACKETS, NUM_ENTRIES);
SerialNum b(0), e(0);
size_t c(0);
@@ -699,7 +705,7 @@ TEST("test sending a lot of data async") {
EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
CallBackManyTest ca(2);
- TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
+ VisitorUP visitor = tls.createVisitor(MANY, ca);
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -712,7 +718,7 @@ TEST("test sending a lot of data async") {
TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
TransLogClient tls("tcp/localhost:18377");
- TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
+ SessionUP s1 = openDomainTest(tls, MANY);
SerialNum b(0), e(0);
size_t c(0);
EXPECT_TRUE(s1->status(b, e, c));
@@ -720,7 +726,7 @@ TEST("test sending a lot of data async") {
EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
CallBackManyTest ca(2);
- TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
+ VisitorUP visitor = tls.createVisitor(MANY, ca);
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -743,7 +749,7 @@ TEST("testErase") {
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, "erase", 0);
- TransLogClient::Session::UP s1 = openDomainTest(tls, "erase");
+ SessionUP s1 = openDomainTest(tls, "erase");
fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES);
}
{
@@ -751,7 +757,7 @@ TEST("testErase") {
TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
TransLogClient tls("tcp/localhost:18377");
- TransLogClient::Session::UP s1 = openDomainTest(tls, "erase");
+ SessionUP s1 = openDomainTest(tls, "erase");
// Before erase
TEST_DO(assertVisitStats(tls, "erase", 2, TOTAL_NUM_ENTRIES,
@@ -839,7 +845,7 @@ TEST("testSync") {
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, "sync", 0);
- TransLogClient::Session::UP s1 = openDomainTest(tls, "sync");
+ SessionUP s1 = openDomainTest(tls, "sync");
fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES);
SerialNum syncedTo(0);
@@ -861,7 +867,7 @@ TEST("test truncate on version mismatch") {
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, "sync", 0);
- TransLogClient::Session::UP s1 = openDomainTest(tls, "sync");
+ SessionUP s1 = openDomainTest(tls, "sync");
fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES);
EXPECT_TRUE(s1->status(fromOld, toOld, countOld));
SerialNum syncedTo(0);
@@ -880,7 +886,7 @@ TEST("test truncate on version mismatch") {
{
TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls("tcp/localhost:18377");
- TransLogClient::Session::UP s1 = openDomainTest(tls, "sync");
+ SessionUP s1 = openDomainTest(tls, "sync");
uint64_t from(0), to(0);
size_t count(0);
EXPECT_TRUE(s1->status(from, to, count));
@@ -906,7 +912,7 @@ TEST("test truncation after short read") {
TransLogClient tls(tlsspec);
createDomainTest(tls, domain, 0);
- TransLogClient::Session::UP s1 = openDomainTest(tls, domain);
+ SessionUP s1 = openDomainTest(tls, domain);
fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES, ENTRYSIZE);
SerialNum syncedTo(0);
@@ -920,7 +926,7 @@ TEST("test truncation after short read") {
{
TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls(tlsspec);
- TransLogClient::Session::UP s1 = openDomainTest(tls, domain);
+ SessionUP s1 = openDomainTest(tls, domain);
checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES);
}
{
@@ -929,14 +935,14 @@ TEST("test truncation after short read") {
{
vespalib::string filename(dir + "/truncate-0000000000000017");
FastOS_File trfile(filename.c_str());
- EXPECT_TRUE(trfile.OpenReadWrite(NULL));
+ EXPECT_TRUE(trfile.OpenReadWrite(nullptr));
trfile.SetSize(trfile.getSize() - 1);
trfile.Close();
}
{
TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls(tlsspec);
- TransLogClient::Session::UP s1 = openDomainTest(tls, domain);
+ SessionUP s1 = openDomainTest(tls, domain);
checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES - 1);
}
{
diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp
index 925f297bf48..a516fb26604 100644
--- a/searchlib/src/tests/transactionlogstress/translogstress.cpp
+++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp
@@ -27,8 +27,10 @@ using search::index::DummyFileHeaderContext;
namespace search::transactionlog {
-using ClientSession = TransLogClient::Session;
-using Visitor = TransLogClient::Visitor;
+using ClientSession = client::Session;
+using client::Visitor;
+using client::TransLogClient;
+using client::RPC;
//-----------------------------------------------------------------------------
// BufferGenerator
@@ -287,7 +289,7 @@ FeederThread::doRun()
//-----------------------------------------------------------------------------
// Agent
//-----------------------------------------------------------------------------
-class Agent : public ClientSession::Callback
+class Agent : public client::Callback
{
protected:
std::string _tlsSpec;
@@ -301,12 +303,13 @@ protected:
public:
Agent(const std::string & tlsSpec, const std::string & domain,
const EntryGenerator & generator, const std::string & name, uint32_t id, bool validate) :
- ClientSession::Callback(),
+ client::Callback(),
_tlsSpec(tlsSpec), _domain(domain), _client(tlsSpec),
- _generator(generator), _name(name), _id(id), _validate(validate) {}
- virtual ~Agent() {}
- virtual RPC::Result receive(const Packet & packet) override = 0;
- virtual void eof() override {}
+ _generator(generator), _name(name), _id(id), _validate(validate)
+ {}
+ ~Agent() override {}
+ RPC::Result receive(const Packet & packet) override = 0;
+ void eof() override {}
virtual void failed() {}
};
diff --git a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt
index 5dca84a26c1..6ce7e652326 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt
@@ -2,6 +2,7 @@
vespa_add_library(searchlib_transactionlog OBJECT
SOURCES
chunks.cpp
+ client_session.cpp
common.cpp
domain.cpp
domainconfig.cpp
diff --git a/searchlib/src/vespa/searchlib/transactionlog/client_common.h b/searchlib/src/vespa/searchlib/transactionlog/client_common.h
new file mode 100644
index 00000000000..05bb30ff368
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/transactionlog/client_common.h
@@ -0,0 +1,20 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+namespace search::transactionlog { class Packet; }
+namespace search::transactionlog::client {
+
+class RPC
+{
+public:
+enum Result { OK, FULL, ERROR };
+};
+
+class Callback {
+public:
+ virtual ~Callback() = default;
+ virtual RPC::Result receive(const Packet & packet) = 0;
+ virtual void eof() { }
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp b/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp
new file mode 100644
index 00000000000..8678d88b43c
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp
@@ -0,0 +1,200 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "client_session.h"
+#include "translogclient.h"
+#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <thread>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".translog.client_session");
+
+using vespalib::LockGuard;
+using namespace std::chrono_literals;
+
+namespace search::transactionlog::client {
+
+SessionKey::SessionKey(const vespalib::string & domain, int sessionId)
+ : _domain(domain),
+ _sessionId(sessionId)
+{ }
+SessionKey::~SessionKey() = default;
+
+int
+SessionKey::cmp(const SessionKey & b) const
+{
+ int diff(strcmp(_domain.c_str(), b._domain.c_str()));
+ if (diff == 0) {
+ diff = _sessionId - b._sessionId;
+ }
+ return diff;
+}
+
+Session::Session(const vespalib::string & domain, TransLogClient & tlc)
+ : _tlc(tlc),
+ _domain(domain),
+ _sessionId(0)
+{
+}
+
+Session::~Session()
+{
+ close();
+ clear();
+}
+
+bool
+Session::commit(const vespalib::ConstBufferRef & buf)
+{
+ bool retval(true);
+ if (buf.size() != 0) {
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
+ req->SetMethodName("domainCommit");
+ req->GetParams()->AddString(_domain.c_str());
+ req->GetParams()->AddData(buf.c_str(), buf.size());
+ int retcode = _tlc.rpc(req);
+ retval = (retcode == 0);
+ if (retval) {
+ req->SubRef();
+ } else {
+ vespalib::string msg;
+ if (req->GetReturn() != nullptr) {
+ msg = req->GetReturn()->GetValue(1)._string._str;
+ } else {
+ msg = vespalib::make_string("Clientside error %s: error(%d): %s", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage());
+ }
+ req->SubRef();
+ throw std::runtime_error(vespalib::make_string("commit failed with code %d. server says: %s", retcode, msg.c_str()));
+ }
+ }
+ return retval;
+}
+
+bool
+Session::status(SerialNum & b, SerialNum & e, size_t & count)
+{
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
+ req->SetMethodName("domainStatus");
+ req->GetParams()->AddString(_domain.c_str());
+ int32_t retval(_tlc.rpc(req));
+ if (retval == 0) {
+ b = req->GetReturn()->GetValue(1)._intval64;
+ e = req->GetReturn()->GetValue(2)._intval64;
+ count = req->GetReturn()->GetValue(3)._intval64;
+ }
+ req->SubRef();
+ return (retval == 0);
+}
+
+bool
+Session::erase(const SerialNum & to)
+{
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
+ req->SetMethodName("domainPrune");
+ req->GetParams()->AddString(_domain.c_str());
+ req->GetParams()->AddInt64(to);
+ int32_t retval(_tlc.rpc(req));
+ req->SubRef();
+ if (retval == 1) {
+ LOG(warning, "Prune to %" PRIu64 " denied since there were active visitors in that area", to);
+ }
+ return (retval == 0);
+}
+
+
+bool
+Session::sync(const SerialNum &syncTo, SerialNum &syncedTo)
+{
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
+ req->SetMethodName("domainSync");
+ FRT_Values & params = *req->GetParams();
+ params.AddString(_domain.c_str());
+ params.AddInt64(syncTo);
+ int32_t retval(_tlc.rpc(req));
+ if (retval == 0) {
+ syncedTo = req->GetReturn()->GetValue(1)._intval64;
+ }
+ req->SubRef();
+ return (retval == 0);
+}
+
+
+void
+Session::clear()
+{
+ if (_sessionId > 0) {
+ LockGuard guard(_tlc._lock);
+ _tlc._sessions.erase(SessionKey(_domain, _sessionId));
+ }
+ _sessionId = 0;
+}
+
+Visitor::Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack) :
+ Session(domain, tlc),
+ _callback(callBack)
+{
+}
+
+bool
+Session::init(FRT_RPCRequest *req)
+{
+ int32_t retval(_tlc.rpc(req));
+ req->SubRef();
+ if (retval > 0) {
+ clear();
+ _sessionId = retval;
+ SessionKey key(_domain, _sessionId);
+ {
+ LockGuard guard(_tlc._lock);
+ _tlc._sessions[key] = this;
+ }
+ retval = run();
+ }
+ return (retval > 0);
+}
+
+bool
+Visitor::visit(const SerialNum & from, const SerialNum & to)
+{
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
+ req->SetMethodName("domainVisit");
+ req->GetParams()->AddString(_domain.c_str());
+ req->GetParams()->AddInt64(from);
+ req->GetParams()->AddInt64(to);
+ return init(req);
+}
+
+bool
+Session::run()
+{
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
+ req->SetMethodName("domainSessionRun");
+ req->GetParams()->AddString(_domain.c_str());
+ req->GetParams()->AddInt32(_sessionId);
+ int32_t retval(_tlc.rpc(req));
+ req->SubRef();
+ return (retval == 0);
+}
+
+bool
+Session::close()
+{
+ int retval(0);
+ if (_sessionId > 0) {
+ do {
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
+ req->SetMethodName("domainSessionClose");
+ req->GetParams()->AddString(_domain.c_str());
+ req->GetParams()->AddInt32(_sessionId);
+ if ( (retval = _tlc.rpc(req)) > 0) {
+ std::this_thread::sleep_for(10ms);
+ }
+ req->SubRef();
+ } while ( retval == 1 );
+ }
+ return (retval == 0);
+}
+
+Visitor::~Visitor() = default;
+
+}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/client_session.h b/searchlib/src/vespa/searchlib/transactionlog/client_session.h
new file mode 100644
index 00000000000..49f24d83aaf
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/transactionlog/client_session.h
@@ -0,0 +1,68 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "client_common.h"
+#include <vespa/searchlib/common/serialnum.h>
+#include <vespa/vespalib/util/buffer.h>
+#include <vespa/vespalib/stllike/string.h>
+
+class FRT_RPCRequest;
+
+namespace search::transactionlog::client {
+
+class TransLogClient;
+
+class SessionKey
+{
+public:
+ SessionKey(const vespalib::string & domain, int sessionId);
+ ~SessionKey();
+ bool operator < (const SessionKey & b) const { return cmp(b) < 0; }
+private:
+ int cmp(const SessionKey & b) const;
+ vespalib::string _domain;
+ int _sessionId;
+};
+
+class Session
+{
+public:
+ Session(const vespalib::string & domain, TransLogClient & tlc);
+ virtual ~Session();
+ /// You can commit data of any registered type to any channel.
+ bool commit(const vespalib::ConstBufferRef & packet);
+ /// Will erase all entries prior to <to>
+ bool erase(const SerialNum & to);
+ bool status(SerialNum & b, SerialNum & e, size_t & count);
+
+ bool sync(const SerialNum &syncTo, SerialNum &syncedTo);
+
+ virtual RPC::Result visit(const Packet & ) { return RPC::OK; }
+ virtual void eof() { }
+ bool close();
+ void clear();
+ const vespalib::string & getDomain() const { return _domain; }
+ const TransLogClient & getTLC() const { return _tlc; }
+protected:
+ bool init(FRT_RPCRequest * req);
+ bool run();
+ TransLogClient & _tlc;
+ vespalib::string _domain;
+ int _sessionId;
+};
+
+/// Here you connect to the incomming data getting everything from <from>
+class Visitor : public Session
+{
+public:
+ Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack);
+ bool visit(const SerialNum & from, const SerialNum & to);
+ virtual ~Visitor() override;
+ RPC::Result visit(const Packet & packet) override { return _callback.receive(packet); }
+ void eof() override { _callback.eof(); }
+private:
+ Callback & _callback;
+};
+
+}
+
diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h
index c5427c5b401..8dba5d448f8 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/common.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/common.h
@@ -10,14 +10,7 @@
namespace search::transactionlog {
/// This represents a type of the entry. Fx update,remove
-typedef uint32_t Type;
-/// A channel represents one data stream.
-
-class RPC
-{
-public:
-enum Result { OK, FULL, ERROR };
-};
+using Type = uint32_t;
class SerialNumRange
{
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
index 2c6c1e249f4..84919a59a97 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
@@ -1,12 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "translogclient.h"
+#include "common.h"
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fnet/frt/target.h>
#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/fnet/transport.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/fastos/thread.h>
-#include <thread>
+
#include <vespa/log/log.h>
LOG_SETUP(".translogclient");
@@ -15,7 +17,7 @@ using namespace std::chrono_literals;
VESPA_THREAD_STACK_TAG(translogclient_rpc_callback)
-namespace search::transactionlog {
+namespace search::transactionlog::client {
namespace {
const double NEVER(-1.0);
@@ -33,7 +35,7 @@ struct RpcTask : public vespalib::Executor::Task {
req->Return();
req = nullptr;
}
- ~RpcTask() {
+ ~RpcTask() override {
if (req != nullptr) {
req->SetError(FRTE_RPC_METHOD_FAILED, "client has been shut down");
req->Return();
@@ -46,13 +48,13 @@ struct RpcTask : public vespalib::Executor::Task {
using vespalib::LockGuard;
TransLogClient::TransLogClient(const vespalib::string & rpcTarget) :
- _executor(1, 128 * 1024, translogclient_rpc_callback),
+ _executor(std::make_unique<vespalib::ThreadStackExecutor>(1, 128 * 1024, translogclient_rpc_callback)),
_rpcTarget(rpcTarget),
_sessions(),
_threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)),
_transport(std::make_unique<FNET_Transport>()),
_supervisor(std::make_unique<FRT_Supervisor>(_transport.get())),
- _target(NULL)
+ _target(nullptr)
{
reconnect();
exportRPC(*_supervisor);
@@ -62,29 +64,33 @@ TransLogClient::TransLogClient(const vespalib::string & rpcTarget) :
TransLogClient::~TransLogClient()
{
disconnect();
- _executor.shutdown().sync();
+ _executor->shutdown().sync();
_transport->ShutDown(true);
}
-bool TransLogClient::reconnect()
+bool
+TransLogClient::reconnect()
{
disconnect();
_target = _supervisor->Get2WayTarget(_rpcTarget.c_str());
return isConnected();
}
-bool TransLogClient::isConnected() const {
- return (_target != NULL) && _target->IsValid();
+bool
+TransLogClient::isConnected() const {
+ return (_target != nullptr) && _target->IsValid();
}
-void TransLogClient::disconnect()
+void
+TransLogClient::disconnect()
{
if (_target) {
_target->SubRef();
}
}
-bool TransLogClient::create(const vespalib::string & domain)
+bool
+TransLogClient::create(const vespalib::string & domain)
{
FRT_RPCRequest *req = _supervisor->AllocRPCRequest();
req->SetMethodName("createDomain");
@@ -94,7 +100,8 @@ bool TransLogClient::create(const vespalib::string & domain)
return (retval == 0);
}
-bool TransLogClient::remove(const vespalib::string & domain)
+bool
+TransLogClient::remove(const vespalib::string & domain)
{
FRT_RPCRequest *req = _supervisor->AllocRPCRequest();
req->SetMethodName("deleteDomain");
@@ -104,27 +111,28 @@ bool TransLogClient::remove(const vespalib::string & domain)
return (retval == 0);
}
-TransLogClient::Session::UP TransLogClient::open(const vespalib::string & domain)
+std::unique_ptr<Session>
+TransLogClient::open(const vespalib::string & domain)
{
- Session::UP session;
FRT_RPCRequest *req = _supervisor->AllocRPCRequest();
req->SetMethodName("openDomain");
req->GetParams()->AddString(domain.c_str());
int32_t retval(rpc(req));
+ req->SubRef();
if (retval == 0) {
- session.reset(new Session(domain, *this));
+ return std::make_unique<Session>(domain, *this);
}
- req->SubRef();
- return session;
+ return std::unique_ptr<Session>();
}
-TransLogClient::Visitor::UP
-TransLogClient::createVisitor(const vespalib::string & domain, TransLogClient::Session::Callback & callBack)
+std::unique_ptr<Visitor>
+TransLogClient::createVisitor(const vespalib::string & domain, Callback & callBack)
{
- return TransLogClient::Visitor::UP(new Visitor(domain, *this, callBack));
+ return std::make_unique<Visitor>(domain, *this, callBack);
}
-bool TransLogClient::listDomains(std::vector<vespalib::string> & dir)
+bool
+TransLogClient::listDomains(std::vector<vespalib::string> & dir)
{
FRT_RPCRequest *req = _supervisor->AllocRPCRequest();
req->SetMethodName("listDomains");
@@ -139,7 +147,8 @@ bool TransLogClient::listDomains(std::vector<vespalib::string> & dir)
return (retval == 0);
}
-int32_t TransLogClient::rpc(FRT_RPCRequest * req)
+int32_t
+TransLogClient::rpc(FRT_RPCRequest * req)
{
int32_t retval(-7);
if (_target) {
@@ -156,15 +165,17 @@ int32_t TransLogClient::rpc(FRT_RPCRequest * req)
return retval;
}
-TransLogClient::Session * TransLogClient::findSession(const vespalib::string & domainName, int sessionId)
+Session *
+TransLogClient::findSession(const vespalib::string & domainName, int sessionId)
{
SessionKey key(domainName, sessionId);
SessionMap::iterator found(_sessions.find(key));
- Session * session((found != _sessions.end()) ? found->second : NULL);
+ Session * session((found != _sessions.end()) ? found->second : nullptr);
return session;
}
-void TransLogClient::exportRPC(FRT_Supervisor & supervisor)
+void
+TransLogClient::exportRPC(FRT_Supervisor & supervisor)
{
FRT_ReflectionBuilder rb( & supervisor);
@@ -185,7 +196,8 @@ void TransLogClient::exportRPC(FRT_Supervisor & supervisor)
}
-void TransLogClient::do_visitCallbackRPC(FRT_RPCRequest *req)
+void
+TransLogClient::do_visitCallbackRPC(FRT_RPCRequest *req)
{
uint32_t retval(uint32_t(-1));
FRT_Values & params = *req->GetParams();
@@ -194,7 +206,7 @@ void TransLogClient::do_visitCallbackRPC(FRT_RPCRequest *req)
int32_t sessionId(params[1]._intval32);
LOG(spam, "visitCallback(%s, %d)(%d)", domainName, sessionId, params[2]._data._len);
Session * session(findSession(domainName, sessionId));
- if (session != NULL) {
+ if (session != nullptr) {
Packet packet(params[2]._data._buf, params[2]._data._len);
retval = session->visit(packet);
}
@@ -202,7 +214,8 @@ void TransLogClient::do_visitCallbackRPC(FRT_RPCRequest *req)
LOG(debug, "visitCallback(%s, %d)=%d done", domainName, sessionId, retval);
}
-void TransLogClient::do_eofCallbackRPC(FRT_RPCRequest *req)
+void
+TransLogClient::do_eofCallbackRPC(FRT_RPCRequest *req)
{
uint32_t retval(uint32_t(-1));
FRT_Values & params = *req->GetParams();
@@ -211,7 +224,7 @@ void TransLogClient::do_eofCallbackRPC(FRT_RPCRequest *req)
int32_t sessionId(params[1]._intval32);
LOG(debug, "eofCallback(%s, %d)", domainName, sessionId);
Session * session(findSession(domainName, sessionId));
- if (session != NULL) {
+ if (session != nullptr) {
session->eof();
retval = 0;
}
@@ -219,183 +232,16 @@ void TransLogClient::do_eofCallbackRPC(FRT_RPCRequest *req)
LOG(debug, "eofCallback(%s, %d)=%d done", domainName, sessionId, retval);
}
-void TransLogClient::visitCallbackRPC_hook(FRT_RPCRequest *req)
-{
- _executor.execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_visitCallbackRPC(x); }));
-}
-
-void TransLogClient::eofCallbackRPC_hook(FRT_RPCRequest *req)
-{
- _executor.execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_eofCallbackRPC(x); }));
-}
-
-
-TransLogClient::Session::Session(const vespalib::string & domain, TransLogClient & tlc) :
- _tlc(tlc),
- _domain(domain),
- _sessionId(0)
-{
-}
-
-TransLogClient::Session::~Session()
-{
- close();
- clear();
-}
-
-bool TransLogClient::Session::commit(const vespalib::ConstBufferRef & buf)
-{
- bool retval(true);
- if (buf.size() != 0) {
- FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
- req->SetMethodName("domainCommit");
- req->GetParams()->AddString(_domain.c_str());
- req->GetParams()->AddData(buf.c_str(), buf.size());
- int retcode = _tlc.rpc(req);
- retval = (retcode == 0);
- if (retval) {
- req->SubRef();
- } else {
- vespalib::string msg;
- if (req->GetReturn() != 0) {
- msg = req->GetReturn()->GetValue(1)._string._str;
- } else {
- msg = vespalib::make_string("Clientside error %s: error(%d): %s", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage());
- }
- req->SubRef();
- throw std::runtime_error(vespalib::make_string("commit failed with code %d. server says: %s", retcode, msg.c_str()));
- }
- }
- return retval;
-}
-
-bool TransLogClient::Session::status(SerialNum & b, SerialNum & e, size_t & count)
-{
- FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
- req->SetMethodName("domainStatus");
- req->GetParams()->AddString(_domain.c_str());
- int32_t retval(_tlc.rpc(req));
- if (retval == 0) {
- b = req->GetReturn()->GetValue(1)._intval64;
- e = req->GetReturn()->GetValue(2)._intval64;
- count = req->GetReturn()->GetValue(3)._intval64;
- }
- req->SubRef();
- return (retval == 0);
-}
-
-bool TransLogClient::Session::erase(const SerialNum & to)
-{
- FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
- req->SetMethodName("domainPrune");
- req->GetParams()->AddString(_domain.c_str());
- req->GetParams()->AddInt64(to);
- int32_t retval(_tlc.rpc(req));
- req->SubRef();
- if (retval == 1) {
- LOG(warning, "Prune to %" PRIu64 " denied since there were active visitors in that area", to);
- }
- return (retval == 0);
-}
-
-
-bool
-TransLogClient::Session::sync(const SerialNum &syncTo, SerialNum &syncedTo)
-{
- FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
- req->SetMethodName("domainSync");
- FRT_Values & params = *req->GetParams();
- params.AddString(_domain.c_str());
- params.AddInt64(syncTo);
- int32_t retval(_tlc.rpc(req));
- if (retval == 0) {
- syncedTo = req->GetReturn()->GetValue(1)._intval64;
- }
- req->SubRef();
- return (retval == 0);
-}
-
-
-void TransLogClient::Session::clear()
-{
- if (_sessionId > 0) {
- LockGuard guard(_tlc._lock);
- _tlc._sessions.erase(SessionKey(_domain, _sessionId));
- }
- _sessionId = 0;
-}
-
-int TransLogClient::SessionKey::cmp(const TransLogClient::SessionKey & b) const
+void
+TransLogClient::visitCallbackRPC_hook(FRT_RPCRequest *req)
{
- int diff(strcmp(_domain.c_str(), b._domain.c_str()));
- if (diff == 0) {
- diff = _sessionId - b._sessionId;
- }
- return diff;
+ _executor->execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_visitCallbackRPC(x); }));
}
-TransLogClient::Visitor::Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack) :
- Session(domain, tlc),
- _callback(callBack)
+void
+TransLogClient::eofCallbackRPC_hook(FRT_RPCRequest *req)
{
+ _executor->execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_eofCallbackRPC(x); }));
}
-bool TransLogClient::Session::init(FRT_RPCRequest *req)
-{
- int32_t retval(_tlc.rpc(req));
- req->SubRef();
- if (retval > 0) {
- clear();
- _sessionId = retval;
- SessionKey key(_domain, _sessionId);
- {
- LockGuard guard(_tlc._lock);
- _tlc._sessions[key] = this;
- }
- retval = run();
- }
- return (retval > 0);
-}
-
-bool TransLogClient::Visitor::visit(const SerialNum & from, const SerialNum & to)
-{
- FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
- req->SetMethodName("domainVisit");
- req->GetParams()->AddString(_domain.c_str());
- req->GetParams()->AddInt64(from);
- req->GetParams()->AddInt64(to);
- return init(req);
-}
-
-bool TransLogClient::Session::run()
-{
- FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
- req->SetMethodName("domainSessionRun");
- req->GetParams()->AddString(_domain.c_str());
- req->GetParams()->AddInt32(_sessionId);
- int32_t retval(_tlc.rpc(req));
- req->SubRef();
- return (retval == 0);
-}
-
-bool TransLogClient::Session::close()
-{
- int retval(0);
- if (_sessionId > 0) {
- do {
- FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
- req->SetMethodName("domainSessionClose");
- req->GetParams()->AddString(_domain.c_str());
- req->GetParams()->AddInt32(_sessionId);
- if ( (retval = _tlc.rpc(req)) > 0) {
- std::this_thread::sleep_for(10ms);
- }
- req->SubRef();
- } while ( retval == 1 );
- }
- return (retval == 0);
-}
-
-TransLogClient::Visitor::~Visitor() = default;
-
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h
index 38c30cd5b4c..289a0fcb8c0 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h
@@ -1,11 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "common.h"
-#include <vespa/document/util/bytebuffer.h>
+#include "client_common.h"
+#include "client_session.h"
#include <vespa/vespalib/util/sync.h>
-#include <vespa/vespalib/util/buffer.h>
-#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/fnet/frt/invokable.h>
#include <map>
#include <vector>
@@ -13,90 +11,39 @@
class FNET_Transport;
class FRT_Supervisor;
class FRT_Target;
+class FastOS_ThreadPool;
-namespace search::transactionlog {
+namespace vespalib { class ThreadStackExecutorBase; }
+namespace search::transactionlog::client {
+
+class Session;
+class Visitor;
class TransLogClient : private FRT_Invokable
{
-private:
- TransLogClient(const TransLogClient &);
- TransLogClient& operator=(const TransLogClient &);
-
public:
- class Session
- {
- public:
- class Callback {
- public:
- virtual ~Callback() { }
- virtual RPC::Result receive(const Packet & packet) = 0;
- virtual void eof() { }
- };
- public:
- typedef std::unique_ptr<Session> UP;
- typedef std::shared_ptr<Session> SP;
-
- Session(const vespalib::string & domain, TransLogClient & tlc);
- virtual ~Session();
- /// You can commit data of any registered type to any channel.
- bool commit(const vespalib::ConstBufferRef & packet);
- /// Will erase all entries prior to <to>
- bool erase(const SerialNum & to);
- bool status(SerialNum & b, SerialNum & e, size_t & count);
-
- bool sync(const SerialNum &syncTo, SerialNum &syncedTo);
-
- virtual RPC::Result visit(const Packet & ) { return RPC::OK; }
- virtual void eof() { }
- bool close();
- void clear();
- const vespalib::string & getDomain() const { return _domain; }
- const TransLogClient & getTLC() const { return _tlc; }
- protected:
- bool init(FRT_RPCRequest * req);
- bool run();
- TransLogClient & _tlc;
- vespalib::string _domain;
- int _sessionId;
- };
- /// Here you connect to the incomming data getting everything from <from>
- class Visitor : public Session
- {
- public:
- typedef std::unique_ptr<Visitor> UP;
- typedef std::shared_ptr<Visitor> SP;
-
- Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack);
- bool visit(const SerialNum & from, const SerialNum & to);
- virtual ~Visitor();
- RPC::Result visit(const Packet & packet) override { return _callback.receive(packet); }
- void eof() override { _callback.eof(); }
- private:
- Callback & _callback;
- };
- /// Here you read the incomming data getting everything from <from>
-
-public:
- typedef std::unique_ptr<TransLogClient> UP;
-
TransLogClient(const vespalib::string & rpctarget);
- virtual ~TransLogClient();
+ TransLogClient(const TransLogClient &) = delete;
+ TransLogClient& operator=(const TransLogClient &) = delete;
+ ~TransLogClient() override;
/// Here you create a new domain
bool create(const vespalib::string & domain);
/// Here you remove a domain
bool remove(const vespalib::string & domain);
/// Here you open an existing domain
- Session::UP open(const vespalib::string & domain);
+ std::unique_ptr<Session> open(const vespalib::string & domain);
/// Here you can get a list of available domains.
bool listDomains(std::vector<vespalib::string> & dir);
- Visitor::UP createVisitor(const vespalib::string & domain, Session::Callback & callBack);
+ std::unique_ptr<Visitor> createVisitor(const vespalib::string & domain, Callback & callBack);
bool isConnected() const;
void disconnect();
bool reconnect();
const vespalib::string &getRPCTarget() const { return _rpcTarget; }
private:
+ friend Session;
+ friend Visitor;
void exportRPC(FRT_Supervisor & supervisor);
void do_visitCallbackRPC(FRT_RPCRequest *req);
void do_eofCallbackRPC(FRT_RPCRequest *req);
@@ -105,22 +52,11 @@ private:
int32_t rpc(FRT_RPCRequest * req);
Session * findSession(const vespalib::string & domain, int sessionId);
- class SessionKey
- {
- public:
- SessionKey(const vespalib::string & domain, int sessionId) : _domain(domain), _sessionId(sessionId) { }
- bool operator < (const SessionKey & b) const { return cmp(b) < 0; }
- private:
- int cmp(const SessionKey & b) const;
- vespalib::string _domain;
- int _sessionId;
- };
-
- typedef std::map< SessionKey, Session * > SessionMap;
+ using SessionMap = std::map< SessionKey, Session * >;
- vespalib::ThreadStackExecutor _executor;
- vespalib::string _rpcTarget;
- SessionMap _sessions;
+ std::unique_ptr<vespalib::ThreadStackExecutorBase> _executor;
+ vespalib::string _rpcTarget;
+ SessionMap _sessions;
//Brute force lock for subscriptions. For multithread safety.
vespalib::Lock _lock;
std::unique_ptr<FastOS_ThreadPool> _threadPool;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index a72731b661e..64e472520a5 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "translogserver.h"
#include "domain.h"
+#include "client_common.h"
#include <vespa/searchlib/common/gatecallback.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/io/fileutil.h>
@@ -399,12 +400,12 @@ public:
private:
bool send(FRT_RPCRequest * req) {
int32_t retval = rpc(req);
- if ( ! ((retval == RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) {
+ if ( ! ((retval == client::RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) {
LOG(error, "Return value != OK(%d) in send for method 'visitCallback'.", retval);
}
req->SubRef();
- return (retval == RPC::OK);
+ return (retval == client::RPC::OK);
}
int32_t rpc(FRT_RPCRequest * req) {
int32_t retval(-7);