aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp16
-rw-r--r--searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstates.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/packetwrapper.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp17
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp44
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h27
-rw-r--r--searchlib/src/tests/transactionlog/translogclient_test.cpp116
-rw-r--r--searchlib/src/tests/transactionlogstress/translogstress.cpp19
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt1
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/client_common.h20
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/client_session.cpp200
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/client_session.h68
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/common.h9
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp250
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogclient.h102
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp5
21 files changed, 506 insertions, 414 deletions
diff --git a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp
index 2f34292ad52..a7c53293981 100644
--- a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp
+++ b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp
@@ -286,7 +286,7 @@ public:
* and dispatches each packet entry to the ReplayPacketDispatcher that
* transforms them into concrete operations.
*/
-class VisitorCallback : public TransLogClient::Session::Callback
+class VisitorCallback : public client::Callback
{
private:
ReplayPacketDispatcher _dispatcher;
@@ -298,7 +298,7 @@ public:
_eof(false)
{
}
- virtual RPC::Result receive(const Packet &packet) override {
+ client::RPC::Result receive(const Packet &packet) override {
vespalib::nbostream_longlivedbuf handle(packet.getHandle().data(), packet.getHandle().size());
try {
while (handle.size() > 0) {
@@ -309,11 +309,11 @@ public:
} catch (const std::exception &e) {
std::cerr << "Error while handling transaction log packet: '"
<< std::string(e.what()) << "'" << std::endl;
- return RPC::ERROR;
+ return client::RPC::ERROR;
}
- return RPC::OK;
+ return client::RPC::OK;
}
- virtual void eof() override { _eof = true; }
+ void eof() override { _eof = true; }
bool isEof() const { return _eof; }
};
@@ -371,7 +371,7 @@ protected:
const BaseOptions &_bopts;
DummyFileHeaderContext _fileHeader;
TransLogServer _server;
- TransLogClient _client;
+ client::TransLogClient _client;
public:
BaseUtility(const BaseOptions &bopts)
@@ -416,7 +416,7 @@ public:
_client.listDomains(domains);
std::cout << "Listing status for " << domains.size() << " domain(s):" << std::endl;
for (size_t i = 0; i < domains.size(); ++i) {
- TransLogClient::Session::UP session = _client.open(domains[i]);
+ std::unique_ptr<client::Session> session = _client.open(domains[i]);
SerialNum first;
SerialNum last;
size_t count;
@@ -484,7 +484,7 @@ protected:
DocTypeRepo repo(_oopts.configDir);
IReplayPacketHandlerUP handler = createHandler(repo.docTypeRepo);
VisitorCallback callback(*handler);
- TransLogClient::Visitor::UP visitor = _client.createVisitor(_oopts.domainName, callback);
+ std::unique_ptr<client::Visitor> visitor = _client.createVisitor(_oopts.domainName, callback);
bool visitOk = visitor->visit(_oopts.firstSerialNum-1, _oopts.lastSerialNum);
if (!visitOk) {
std::cerr << "Visiting domain '" << _oopts.domainName << "' [" << _oopts.firstSerialNum << ","
diff --git a/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp b/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp
index edb4250ce76..2df7c5d629d 100644
--- a/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp
+++ b/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp
@@ -11,7 +11,7 @@
#include <vespa/vespalib/util/stringfmt.h>
using search::index::DummyFileHeaderContext;
-using search::transactionlog::TransLogClient;
+using search::transactionlog::client::TransLogClient;
using search::transactionlog::TransLogServer;
using proton::DocTypeName;
using proton::ProtonDiskLayout;
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index a1afd7a0bb8..8a117f112b4 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -19,6 +19,7 @@
#include <vespa/searchcore/proton/common/eventlogger.h>
#include <vespa/searchcorespi/index/ithreadingservice.h>
#include <vespa/searchlib/common/gatecallback.h>
+#include <vespa/searchlib/transactionlog/client_session.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <unistd.h>
@@ -331,7 +332,7 @@ FeedHandler::FeedHandler(IThreadingService &writeService,
IReplayConfig &replayConfig,
search::transactionlog::Writer & tlsDirectWriter,
TlsWriter * tlsWriter)
- : search::transactionlog::TransLogClient::Session::Callback(),
+ : search::transactionlog::client::Callback(),
IDocumentMoveHandler(),
IPruneRemovedDocumentsHandler(),
IHeartBeatHandler(),
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index b7fc2733f49..d70cefb288b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -13,7 +13,7 @@
#include <vespa/document/bucket/bucketid.h>
#include <vespa/searchcore/proton/common/doctypename.h>
#include <vespa/searchcore/proton/common/feedtoken.h>
-#include <vespa/searchlib/transactionlog/translogclient.h>
+#include <vespa/searchlib/transactionlog/client_common.h>
#include <shared_mutex>
namespace searchcorespi::index { struct IThreadingService; }
@@ -43,7 +43,7 @@ namespace bucketdb { class IBucketDBHandler; }
* Class handling all aspects of feeding for a document database.
* In addition to regular feeding this also includes handling the transaction log.
*/
-class FeedHandler: private search::transactionlog::TransLogClient::Session::Callback,
+class FeedHandler: private search::transactionlog::client::Callback,
public IDocumentMoveHandler,
public IPruneRemovedDocumentsHandler,
public IHeartBeatHandler,
@@ -52,7 +52,7 @@ class FeedHandler: private search::transactionlog::TransLogClient::Session::Call
{
private:
using Packet = search::transactionlog::Packet;
- using RPC = search::transactionlog::RPC;
+ using RPC = search::transactionlog::client::RPC;
using SerialNum = search::SerialNum;
using Timestamp = storage::spi::Timestamp;
using BucketId = document::BucketId;
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
index d01c25d9c1e..5214e13fe79 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
@@ -16,7 +16,7 @@
LOG_SETUP(".proton.server.feedstates");
using search::transactionlog::Packet;
-using search::transactionlog::RPC;
+using search::transactionlog::client::RPC;
using search::SerialNum;
using vespalib::Executor;
using vespalib::makeClosure;
diff --git a/searchcore/src/vespa/searchcore/proton/server/packetwrapper.h b/searchcore/src/vespa/searchcore/proton/server/packetwrapper.h
index 6224b3b693a..c36652ec847 100644
--- a/searchcore/src/vespa/searchcore/proton/server/packetwrapper.h
+++ b/searchcore/src/vespa/searchcore/proton/server/packetwrapper.h
@@ -4,6 +4,7 @@
#include "tls_replay_progress.h"
#include <vespa/searchlib/transactionlog/common.h>
+#include <vespa/searchlib/transactionlog/client_common.h>
#include <vespa/vespalib/util/gate.h>
namespace proton {
@@ -16,14 +17,14 @@ struct PacketWrapper {
const search::transactionlog::Packet &packet;
TlsReplayProgress *progress;
- search::transactionlog::RPC::Result result;
+ search::transactionlog::client::RPC::Result result;
vespalib::Gate gate;
PacketWrapper(const search::transactionlog::Packet &p,
TlsReplayProgress *progress_)
: packet(p),
progress(progress_),
- result(search::transactionlog::RPC::ERROR),
+ result(search::transactionlog::client::RPC::ERROR),
gate()
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp
index 31fd44eec5e..23289296ada 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp
@@ -2,15 +2,13 @@
#include "proton_disk_layout.h"
#include <vespa/vespalib/io/fileutil.h>
-#include <vespa/fastos/file.h>
#include <vespa/searchcore/proton/common/doctypename.h>
#include <vespa/searchlib/transactionlog/translogclient.h>
-#include <cassert>
#include <vespa/log/log.h>
LOG_SETUP(".proton.server.proton_disk_layout");
-using search::transactionlog::TransLogClient;
+using search::transactionlog::client::TransLogClient;
namespace proton {
diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp
index 3ad98cba3ac..fdc9b6d7807 100644
--- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp
@@ -2,6 +2,7 @@
#include "configstore.h"
#include "transactionlogmanager.h"
+#include <vespa/searchlib/transactionlog/translogclient.h>
#include <vespa/searchcore/proton/common/eventlogger.h>
#include <vespa/vespalib/util/closuretask.h>
#include <vespa/vespalib/util/exceptions.h>
@@ -11,11 +12,11 @@ LOG_SETUP(".proton.server.transactionlogmanager");
using vespalib::IllegalStateException;
using vespalib::make_string;
-using search::transactionlog::TransLogClient;
+using search::transactionlog::client::TransLogClient;
+using search::transactionlog::client::Session;
namespace proton {
-
void
TransactionLogManager::doLogReplayComplete(const vespalib::string &domainName,
vespalib::duration elapsedTime) const
@@ -45,10 +46,8 @@ TransactionLogManager::init(SerialNum oldestConfigSerial, SerialNum &prunedSeria
namespace {
-void getStatus(TransLogClient::Session & session,
- search::SerialNum & serialBegin,
- search::SerialNum & serialEnd,
- size_t & count)
+void
+getStatus(Session & session, search::SerialNum & serialBegin, search::SerialNum & serialEnd, size_t & count)
{
if (!session.status(serialBegin, serialEnd, count)) {
throw IllegalStateException(
@@ -66,7 +65,7 @@ void getStatus(TransLogClient & client,
search::SerialNum & serialEnd,
size_t & count)
{
- TransLogClient::Session::UP session = client.open(domainName);
+ std::unique_ptr<Session> session = client.open(domainName);
if ( ! session) {
throw IllegalStateException(
make_string(
@@ -117,7 +116,7 @@ TransactionLogManager::prepareReplay(TransLogClient &client,
TlsReplayProgress::UP
TransactionLogManager::startReplay(SerialNum first,
SerialNum syncToken,
- TransLogClient::Session::Callback &callback)
+ Callback &callback)
{
assert( !_visitor);
_visitor = createTlcVisitor(callback);
@@ -142,7 +141,7 @@ TransactionLogManager::startReplay(SerialNum first,
getDomainName().c_str(),
first, syncToken, getRpcTarget().c_str()));
}
- return TlsReplayProgress::UP(new TlsReplayProgress(getDomainName(), first, syncToken));
+ return std::make_unique<TlsReplayProgress>(getDomainName(), first, syncToken);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h
index 58444351e8f..32532e3f656 100644
--- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h
+++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h
@@ -13,7 +13,7 @@ struct ConfigStore;
**/
class TransactionLogManager : public TransactionLogManagerBase
{
- TransLogClient::Visitor::UP _visitor;
+ std::unique_ptr<Visitor> _visitor;
void doLogReplayComplete(const vespalib::string &domainName, vespalib::duration elapsedTime) const override;
@@ -51,7 +51,7 @@ public:
/**
* Start replay of the transaction log.
**/
- TlsReplayProgress::UP startReplay(SerialNum first, SerialNum syncToken, TransLogClient::Session::Callback &callback);
+ TlsReplayProgress::UP startReplay(SerialNum first, SerialNum syncToken, Callback &callback);
/**
* Indicate that replay is done.
diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp
index 8b18a7ae566..a8ecb2ba07b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp
@@ -1,19 +1,20 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "transactionlogmanagerbase.h"
+#include <vespa/searchlib/transactionlog/translogclient.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/log/log.h>
LOG_SETUP(".proton.server.transactionlogmanagerbase");
-using search::transactionlog::TransLogClient;
+using search::transactionlog::client::Visitor;
namespace proton {
TransactionLogManagerBase::TransactionLogManagerBase(
const vespalib::string &tlsSpec, const vespalib::string &domainName) :
- _tlc(tlsSpec),
+ _tlc(std::make_unique<TransLogClient>(tlsSpec)),
_tlcSession(),
_domainName(domainName),
_replayLock(),
@@ -29,31 +30,31 @@ TransactionLogManagerBase::~TransactionLogManagerBase() = default;
TransactionLogManagerBase::StatusResult
TransactionLogManagerBase::init()
{
- TransLogClient::Session::UP session = _tlc.open(_domainName);
+ std::unique_ptr<Session> session = _tlc->open(_domainName);
if ( ! session) {
- if (!_tlc.create(_domainName)) {
+ if (!_tlc->create(_domainName)) {
vespalib::string str = vespalib::make_string(
"Failed creating domain '%s' on TLS '%s'",
- _domainName.c_str(), _tlc.getRPCTarget().c_str());
+ _domainName.c_str(), _tlc->getRPCTarget().c_str());
throw std::runtime_error(str);
}
LOG(debug, "Created domain '%s' on TLS '%s'",
- _domainName.c_str(), _tlc.getRPCTarget().c_str());
- session = _tlc.open(_domainName);
+ _domainName.c_str(), _tlc->getRPCTarget().c_str());
+ session = _tlc->open(_domainName);
if ( ! session) {
vespalib::string str = vespalib::make_string(
"Could not open session for domain '%s' on TLS '%s'",
- _domainName.c_str(), _tlc.getRPCTarget().c_str());
+ _domainName.c_str(), _tlc->getRPCTarget().c_str());
throw std::runtime_error(str);
}
}
LOG(debug, "Opened domain '%s' on TLS '%s'",
- _domainName.c_str(), _tlc.getRPCTarget().c_str());
+ _domainName.c_str(), _tlc->getRPCTarget().c_str());
StatusResult res;
if (!session->status(res.serialBegin, res.serialEnd, res.count)) {
vespalib::string str = vespalib::make_string(
"Could not get status from session with domain '%s' on TLS '%s'",
- _domainName.c_str(), _tlc.getRPCTarget().c_str());
+ _domainName.c_str(), _tlc->getRPCTarget().c_str());
throw std::runtime_error(str);
}
LOG(debug,
@@ -72,7 +73,8 @@ TransactionLogManagerBase::internalStartReplay()
_replayStopWatch = vespalib::Timer();
}
-void TransactionLogManagerBase::changeReplayDone()
+void
+TransactionLogManagerBase::changeReplayDone()
{
std::lock_guard<std::mutex> guard(_replayLock);
_replayDone = true;
@@ -101,23 +103,31 @@ TransactionLogManagerBase::close()
}
}
-TransLogClient::Visitor::UP
-TransactionLogManagerBase::createTlcVisitor(TransLogClient::Session::Callback &callback) {
- return _tlc.createVisitor(_domainName, callback);
+std::unique_ptr<Visitor>
+TransactionLogManagerBase::createTlcVisitor(Callback &callback) {
+ return _tlc->createVisitor(_domainName, callback);
}
-bool TransactionLogManagerBase::getReplayDone() const {
+bool
+TransactionLogManagerBase::getReplayDone() const {
std::lock_guard<std::mutex> guard(_replayLock);
return _replayDone;
}
-bool TransactionLogManagerBase::isDoingReplay() const {
+bool
+TransactionLogManagerBase::isDoingReplay() const {
std::lock_guard<std::mutex> guard(_replayLock);
return _replayStarted && !_replayDone;
}
-void TransactionLogManagerBase::logReplayComplete() const {
+void
+TransactionLogManagerBase::logReplayComplete() const {
doLogReplayComplete(_domainName, _replayStopWatch.elapsed());
}
+const vespalib::string &
+TransactionLogManagerBase::getRpcTarget() const {
+ return _tlc->getRPCTarget();
+}
+
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h
index 4b5d001a28e..7059604dfe7 100644
--- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h
+++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h
@@ -2,11 +2,17 @@
#pragma once
-#include <vespa/searchlib/transactionlog/translogclient.h>
+#include <vespa/searchlib/common/serialnum.h>
#include <vespa/vespalib/util/time.h>
#include <mutex>
#include <condition_variable>
+namespace search::transactionlog::client {
+ class TransLogClient;
+ class Session;
+ class Visitor;
+ class Callback;
+}
namespace proton {
/**
@@ -14,10 +20,13 @@ namespace proton {
**/
class TransactionLogManagerBase {
protected:
- using TransLogClient = search::transactionlog::TransLogClient;
+ using TransLogClient = search::transactionlog::client::TransLogClient;
+ using Session = search::transactionlog::client::Session;
+ using Visitor = search::transactionlog::client::Visitor;
+ using Callback = search::transactionlog::client::Callback;
private:
- TransLogClient _tlc;
- TransLogClient::Session::UP _tlcSession;
+ std::unique_ptr<TransLogClient> _tlc;
+ std::unique_ptr<Session> _tlcSession;
vespalib::string _domainName;
mutable std::mutex _replayLock;
mutable std::condition_variable _replayCond;
@@ -26,7 +35,7 @@ private:
vespalib::Timer _replayStopWatch;
protected:
- typedef search::SerialNum SerialNum;
+ using SerialNum = search::SerialNum;
struct StatusResult {
SerialNum serialBegin;
@@ -55,17 +64,17 @@ public:
void changeReplayDone();
void close();
- TransLogClient::Visitor::UP createTlcVisitor(TransLogClient::Session::Callback &callback);
+ std::unique_ptr<Visitor> createTlcVisitor(Callback &callback);
void waitForReplayDone() const;
- TransLogClient &getClient() { return _tlc; }
- TransLogClient::Session *getSession() { return _tlcSession.get(); }
+ TransLogClient &getClient() { return *_tlc; }
+ Session *getSession() { return _tlcSession.get(); }
const vespalib::string &getDomainName() const { return _domainName; }
bool getReplayDone() const;
bool isDoingReplay() const;
void logReplayComplete() const;
- const vespalib::string &getRpcTarget() const { return _tlc.getRPCTarget(); }
+ const vespalib::string &getRpcTarget() const;
};
} // namespace proton
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp
index a20e0cc3aaa..e6cf44cb697 100644
--- a/searchlib/src/tests/transactionlog/translogclient_test.cpp
+++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp
@@ -4,6 +4,7 @@
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/objects/identifiable.h>
#include <vespa/searchlib/index/dummyfileheadercontext.h>
+#include <vespa/document/util/bytebuffer.h>
#include <vespa/fastos/file.h>
#include <vespa/log/log.h>
@@ -15,17 +16,24 @@ using namespace document;
using namespace vespalib;
using namespace std::chrono_literals;
using search::index::DummyFileHeaderContext;
+using search::transactionlog::client::TransLogClient;
+using search::transactionlog::client::Session;
+using search::transactionlog::client::Visitor;
+using search::transactionlog::client::RPC;
+using search::transactionlog::client::Callback;
+using SessionUP = std::unique_ptr<Session>;
+using VisitorUP = std::unique_ptr<Visitor>;
namespace {
bool createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains=0);
-TransLogClient::Session::UP openDomainTest(TransLogClient & tls, const vespalib::string & name);
-bool fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name);
-void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries);
-void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize);
+SessionUP openDomainTest(TransLogClient & tls, const vespalib::string & name);
+bool fillDomainTest(Session * s1, const vespalib::string & name);
+void fillDomainTest(Session * s1, size_t numPackets, size_t numEntries);
+void fillDomainTest(Session * s1, size_t numPackets, size_t numEntries, size_t entrySize);
uint32_t countFiles(const vespalib::string &dir);
-void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries);
-bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name);
+void checkFilledDomainTest(const SessionUP &s1, size_t numEntries);
+bool visitDomainTest(TransLogClient & tls, Session * s1, const vespalib::string & name);
void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains);
void verifyDomain(const vespalib::string & name);
@@ -43,7 +51,7 @@ myhex(const void * b, size_t sz)
return s;
}
-class CallBackTest : public TransLogClient::Visitor::Callback
+class CallBackTest : public Callback
{
private:
virtual RPC::Result receive(const Packet & packet) override;
@@ -74,7 +82,7 @@ CallBackTest::receive(const Packet & p)
return RPC::OK;
}
-class CallBackManyTest : public TransLogClient::Visitor::Callback
+class CallBackManyTest : public Callback
{
private:
virtual RPC::Result receive(const Packet & packet) override;
@@ -103,7 +111,7 @@ CallBackManyTest::receive(const Packet & p)
return RPC::OK;
}
-class CallBackUpdate : public TransLogClient::Visitor::Callback
+class CallBackUpdate : public Callback
{
public:
typedef std::map<SerialNum, Identifiable *> PacketMap;
@@ -153,7 +161,7 @@ CallBackUpdate::receive(const Packet & packet)
return RPC::OK;
}
-class CallBackStatsTest : public TransLogClient::Session::Callback
+class CallBackStatsTest : public Callback
{
private:
virtual RPC::Result receive(const Packet & packet) override;
@@ -219,8 +227,8 @@ createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t pre
std::vector<vespalib::string> dir;
tls.listDomains(dir);
EXPECT_EQUAL (dir.size(), preExistingDomains);
- TransLogClient::Session::UP s1 = tls.open(name);
- ASSERT_TRUE (s1.get() == NULL);
+ SessionUP s1 = tls.open(name);
+ ASSERT_FALSE (s1);
retval = tls.create(name);
ASSERT_TRUE (retval);
dir.clear();
@@ -230,16 +238,16 @@ createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t pre
return retval;
}
-TransLogClient::Session::UP
+SessionUP
openDomainTest(TransLogClient & tls, const vespalib::string & name)
{
- TransLogClient::Session::UP s1 = tls.open(name);
- ASSERT_TRUE (s1.get() != NULL);
+ SessionUP s1 = tls.open(name);
+ ASSERT_TRUE (s1);
return s1;
}
bool
-fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name)
+fillDomainTest(Session * s1, const vespalib::string & name)
{
bool retval(true);
Packet::Entry e1(1, 1, vespalib::ConstBufferRef("Content in buffer A", 20));
@@ -279,7 +287,7 @@ fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name)
}
void
-fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries)
+fillDomainTest(Session * s1, size_t numPackets, size_t numEntries)
{
size_t value(0);
for(size_t i=0; i < numPackets; i++) {
@@ -333,7 +341,7 @@ fillDomainTest(TransLogServer & s1, const vespalib::string & domain, size_t numP
void
-fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize)
+fillDomainTest(Session * s1, size_t numPackets, size_t numEntries, size_t entrySize)
{
size_t value(0);
std::vector<char> entryBuffer(entrySize);
@@ -368,7 +376,7 @@ countFiles(const vespalib::string &dir)
}
void
-checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries)
+checkFilledDomainTest(const SessionUP &s1, size_t numEntries)
{
SerialNum b(0), e(0);
size_t c(0);
@@ -379,7 +387,7 @@ checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries)
}
bool
-visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name)
+visitDomainTest(TransLogClient & tls, Session * s1, const vespalib::string & name)
{
bool retval(true);
@@ -391,7 +399,7 @@ visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespal
EXPECT_EQUAL(c, 3u);
CallBackTest ca;
- TransLogClient::Visitor::UP visitor = tls.createVisitor(name, ca);
+ VisitorUP visitor = tls.createVisitor(name, ca);
ASSERT_TRUE(visitor.get());
EXPECT_TRUE( visitor->visit(0, 1) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -451,7 +459,7 @@ void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, name, preExistingDomains);
- TransLogClient::Session::UP s1 = openDomainTest(tls, name);
+ SessionUP s1 = openDomainTest(tls, name);
fillDomainTest(s1.get(), name);
}
@@ -459,7 +467,7 @@ void verifyDomain(const vespalib::string & name) {
DummyFileHeaderContext fileHeaderContext;
TransLogServer tlss("test13", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
TransLogClient tls("tcp/localhost:18377");
- TransLogClient::Session::UP s1 = openDomainTest(tls, name);
+ SessionUP s1 = openDomainTest(tls, name);
visitDomainTest(tls, s1.get(), name);
}
@@ -472,7 +480,7 @@ TEST("testVisitOverGeneratedDomain") {
vespalib::string name("test1");
createDomainTest(tls, name);
- TransLogClient::Session::UP s1 = openDomainTest(tls, name);
+ SessionUP s1 = openDomainTest(tls, name);
fillDomainTest(s1.get(), name);
EXPECT_EQUAL(0, getMaxSessionRunTime(tlss, "test1"));
visitDomainTest(tls, s1.get(), name);
@@ -488,7 +496,7 @@ TEST("testVisitOverPreExistingDomain") {
TransLogClient tls("tcp/localhost:18377");
vespalib::string name("test1");
- TransLogClient::Session::UP s1 = openDomainTest(tls, name);
+ SessionUP s1 = openDomainTest(tls, name);
visitDomainTest(tls, s1.get(), name);
}
@@ -497,8 +505,8 @@ TEST("partialUpdateTest") {
TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls("tcp/localhost:18377");
- TransLogClient::Session::UP s1 = openDomainTest(tls, "test1");
- TransLogClient::Session & session = *s1;
+ SessionUP s1 = openDomainTest(tls, "test1");
+ Session & session = *s1;
TestIdentifiable du;
@@ -513,7 +521,7 @@ TEST("partialUpdateTest") {
ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().data(), pa.getHandle().size())));
CallBackUpdate ca;
- TransLogClient::Visitor::UP visitor = tls.createVisitor("test1", ca);
+ VisitorUP visitor = tls.createVisitor("test1", ca);
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(5, 7) );
for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -522,7 +530,7 @@ TEST("partialUpdateTest") {
ASSERT_TRUE( ca.hasSerial(7) );
CallBackUpdate ca1;
- TransLogClient::Visitor::UP visitor1 = tls.createVisitor("test1", ca1);
+ VisitorUP visitor1 = tls.createVisitor("test1", ca1);
ASSERT_TRUE(visitor1.get());
ASSERT_TRUE( visitor1->visit(4, 5) );
for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -530,7 +538,7 @@ TEST("partialUpdateTest") {
ASSERT_TRUE( ca1.map().size() == 0);
CallBackUpdate ca2;
- TransLogClient::Visitor::UP visitor2 = tls.createVisitor("test1", ca2);
+ VisitorUP visitor2 = tls.createVisitor("test1", ca2);
ASSERT_TRUE(visitor2.get());
ASSERT_TRUE( visitor2->visit(5, 6) );
for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -538,7 +546,7 @@ TEST("partialUpdateTest") {
ASSERT_TRUE( ca2.map().size() == 0);
CallBackUpdate ca3;
- TransLogClient::Visitor::UP visitor3 = tls.createVisitor("test1", ca3);
+ VisitorUP visitor3 = tls.createVisitor("test1", ca3);
ASSERT_TRUE(visitor3.get());
ASSERT_TRUE( visitor3->visit(5, 1000) );
for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -562,7 +570,7 @@ TEST("testRemove") {
vespalib::string name("test-delete");
createDomainTest(tls, name);
- TransLogClient::Session::UP s1 = openDomainTest(tls, name);
+ SessionUP s1 = openDomainTest(tls, name);
fillDomainTest(s1.get(), name);
visitDomainTest(tls, s1.get(), name);
ASSERT_TRUE(tls.remove(name));
@@ -577,7 +585,7 @@ assertVisitStats(TransLogClient &tls, const vespalib::string &domain,
uint64_t expCount, uint64_t expInOrder)
{
CallBackStatsTest ca;
- TransLogClient::Visitor::UP visitor = tls.createVisitor(domain, ca);
+ VisitorUP visitor = tls.createVisitor(domain, ca);
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(visitStart, visitEnd) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) {
@@ -591,9 +599,7 @@ assertVisitStats(TransLogClient &tls, const vespalib::string &domain,
}
void
-assertStatus(TransLogClient::Session &s,
- SerialNum expFirstSerial, SerialNum expLastSerial,
- uint64_t expCount)
+assertStatus(Session &s, SerialNum expFirstSerial, SerialNum expLastSerial, uint64_t expCount)
{
SerialNum b(0), e(0);
size_t c(0);
@@ -618,7 +624,7 @@ TEST("test sending a lot of data") {
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, MANY, 0);
- TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
+ SessionUP s1 = openDomainTest(tls, MANY);
fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES);
SerialNum b(0), e(0);
size_t c(0);
@@ -627,7 +633,7 @@ TEST("test sending a lot of data") {
EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
CallBackManyTest ca(2);
- TransLogClient::Visitor::UP visitor = tls.createVisitor("many", ca);
+ VisitorUP visitor = tls.createVisitor("many", ca);
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -640,7 +646,7 @@ TEST("test sending a lot of data") {
TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
TransLogClient tls("tcp/localhost:18377");
- TransLogClient::Session::UP s1 = openDomainTest(tls, "many");
+ SessionUP s1 = openDomainTest(tls, "many");
SerialNum b(0), e(0);
size_t c(0);
EXPECT_TRUE(s1->status(b, e, c));
@@ -648,7 +654,7 @@ TEST("test sending a lot of data") {
EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
CallBackManyTest ca(2);
- TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
+ VisitorUP visitor = tls.createVisitor(MANY, ca);
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -661,7 +667,7 @@ TEST("test sending a lot of data") {
TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
TransLogClient tls("tcp/localhost:18377");
- TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
+ SessionUP s1 = openDomainTest(tls, MANY);
SerialNum b(0), e(0);
size_t c(0);
EXPECT_TRUE(s1->status(b, e, c));
@@ -669,7 +675,7 @@ TEST("test sending a lot of data") {
EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
CallBackManyTest ca(2);
- TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
+ VisitorUP visitor = tls.createVisitor(MANY, ca);
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -690,7 +696,7 @@ TEST("test sending a lot of data async") {
.setChunkAgeLimit(10ms));
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, MANY, 1);
- TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
+ SessionUP s1 = openDomainTest(tls, MANY);
fillDomainTest(tlss, MANY, NUM_PACKETS, NUM_ENTRIES);
SerialNum b(0), e(0);
size_t c(0);
@@ -699,7 +705,7 @@ TEST("test sending a lot of data async") {
EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
CallBackManyTest ca(2);
- TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
+ VisitorUP visitor = tls.createVisitor(MANY, ca);
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -712,7 +718,7 @@ TEST("test sending a lot of data async") {
TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
TransLogClient tls("tcp/localhost:18377");
- TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
+ SessionUP s1 = openDomainTest(tls, MANY);
SerialNum b(0), e(0);
size_t c(0);
EXPECT_TRUE(s1->status(b, e, c));
@@ -720,7 +726,7 @@ TEST("test sending a lot of data async") {
EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
CallBackManyTest ca(2);
- TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
+ VisitorUP visitor = tls.createVisitor(MANY, ca);
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -743,7 +749,7 @@ TEST("testErase") {
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, "erase", 0);
- TransLogClient::Session::UP s1 = openDomainTest(tls, "erase");
+ SessionUP s1 = openDomainTest(tls, "erase");
fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES);
}
{
@@ -751,7 +757,7 @@ TEST("testErase") {
TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
TransLogClient tls("tcp/localhost:18377");
- TransLogClient::Session::UP s1 = openDomainTest(tls, "erase");
+ SessionUP s1 = openDomainTest(tls, "erase");
// Before erase
TEST_DO(assertVisitStats(tls, "erase", 2, TOTAL_NUM_ENTRIES,
@@ -839,7 +845,7 @@ TEST("testSync") {
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, "sync", 0);
- TransLogClient::Session::UP s1 = openDomainTest(tls, "sync");
+ SessionUP s1 = openDomainTest(tls, "sync");
fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES);
SerialNum syncedTo(0);
@@ -861,7 +867,7 @@ TEST("test truncate on version mismatch") {
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, "sync", 0);
- TransLogClient::Session::UP s1 = openDomainTest(tls, "sync");
+ SessionUP s1 = openDomainTest(tls, "sync");
fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES);
EXPECT_TRUE(s1->status(fromOld, toOld, countOld));
SerialNum syncedTo(0);
@@ -880,7 +886,7 @@ TEST("test truncate on version mismatch") {
{
TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls("tcp/localhost:18377");
- TransLogClient::Session::UP s1 = openDomainTest(tls, "sync");
+ SessionUP s1 = openDomainTest(tls, "sync");
uint64_t from(0), to(0);
size_t count(0);
EXPECT_TRUE(s1->status(from, to, count));
@@ -906,7 +912,7 @@ TEST("test truncation after short read") {
TransLogClient tls(tlsspec);
createDomainTest(tls, domain, 0);
- TransLogClient::Session::UP s1 = openDomainTest(tls, domain);
+ SessionUP s1 = openDomainTest(tls, domain);
fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES, ENTRYSIZE);
SerialNum syncedTo(0);
@@ -920,7 +926,7 @@ TEST("test truncation after short read") {
{
TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls(tlsspec);
- TransLogClient::Session::UP s1 = openDomainTest(tls, domain);
+ SessionUP s1 = openDomainTest(tls, domain);
checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES);
}
{
@@ -929,14 +935,14 @@ TEST("test truncation after short read") {
{
vespalib::string filename(dir + "/truncate-0000000000000017");
FastOS_File trfile(filename.c_str());
- EXPECT_TRUE(trfile.OpenReadWrite(NULL));
+ EXPECT_TRUE(trfile.OpenReadWrite(nullptr));
trfile.SetSize(trfile.getSize() - 1);
trfile.Close();
}
{
TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls(tlsspec);
- TransLogClient::Session::UP s1 = openDomainTest(tls, domain);
+ SessionUP s1 = openDomainTest(tls, domain);
checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES - 1);
}
{
diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp
index 925f297bf48..a516fb26604 100644
--- a/searchlib/src/tests/transactionlogstress/translogstress.cpp
+++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp
@@ -27,8 +27,10 @@ using search::index::DummyFileHeaderContext;
namespace search::transactionlog {
-using ClientSession = TransLogClient::Session;
-using Visitor = TransLogClient::Visitor;
+using ClientSession = client::Session;
+using client::Visitor;
+using client::TransLogClient;
+using client::RPC;
//-----------------------------------------------------------------------------
// BufferGenerator
@@ -287,7 +289,7 @@ FeederThread::doRun()
//-----------------------------------------------------------------------------
// Agent
//-----------------------------------------------------------------------------
-class Agent : public ClientSession::Callback
+class Agent : public client::Callback
{
protected:
std::string _tlsSpec;
@@ -301,12 +303,13 @@ protected:
public:
Agent(const std::string & tlsSpec, const std::string & domain,
const EntryGenerator & generator, const std::string & name, uint32_t id, bool validate) :
- ClientSession::Callback(),
+ client::Callback(),
_tlsSpec(tlsSpec), _domain(domain), _client(tlsSpec),
- _generator(generator), _name(name), _id(id), _validate(validate) {}
- virtual ~Agent() {}
- virtual RPC::Result receive(const Packet & packet) override = 0;
- virtual void eof() override {}
+ _generator(generator), _name(name), _id(id), _validate(validate)
+ {}
+ ~Agent() override {}
+ RPC::Result receive(const Packet & packet) override = 0;
+ void eof() override {}
virtual void failed() {}
};
diff --git a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt
index 5dca84a26c1..6ce7e652326 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt
@@ -2,6 +2,7 @@
vespa_add_library(searchlib_transactionlog OBJECT
SOURCES
chunks.cpp
+ client_session.cpp
common.cpp
domain.cpp
domainconfig.cpp
diff --git a/searchlib/src/vespa/searchlib/transactionlog/client_common.h b/searchlib/src/vespa/searchlib/transactionlog/client_common.h
new file mode 100644
index 00000000000..05bb30ff368
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/transactionlog/client_common.h
@@ -0,0 +1,20 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+namespace search::transactionlog { class Packet; }
+namespace search::transactionlog::client {
+
+class RPC
+{
+public:
+enum Result { OK, FULL, ERROR };
+};
+
+class Callback {
+public:
+ virtual ~Callback() = default;
+ virtual RPC::Result receive(const Packet & packet) = 0;
+ virtual void eof() { }
+};
+
+}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp b/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp
new file mode 100644
index 00000000000..8678d88b43c
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp
@@ -0,0 +1,200 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "client_session.h"
+#include "translogclient.h"
+#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <thread>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".translog.client_session");
+
+using vespalib::LockGuard;
+using namespace std::chrono_literals;
+
+namespace search::transactionlog::client {
+
+SessionKey::SessionKey(const vespalib::string & domain, int sessionId)
+ : _domain(domain),
+ _sessionId(sessionId)
+{ }
+SessionKey::~SessionKey() = default;
+
+int
+SessionKey::cmp(const SessionKey & b) const
+{
+ int diff(strcmp(_domain.c_str(), b._domain.c_str()));
+ if (diff == 0) {
+ diff = _sessionId - b._sessionId;
+ }
+ return diff;
+}
+
+Session::Session(const vespalib::string & domain, TransLogClient & tlc)
+ : _tlc(tlc),
+ _domain(domain),
+ _sessionId(0)
+{
+}
+
+Session::~Session()
+{
+ close();
+ clear();
+}
+
+bool
+Session::commit(const vespalib::ConstBufferRef & buf)
+{
+ bool retval(true);
+ if (buf.size() != 0) {
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
+ req->SetMethodName("domainCommit");
+ req->GetParams()->AddString(_domain.c_str());
+ req->GetParams()->AddData(buf.c_str(), buf.size());
+ int retcode = _tlc.rpc(req);
+ retval = (retcode == 0);
+ if (retval) {
+ req->SubRef();
+ } else {
+ vespalib::string msg;
+ if (req->GetReturn() != nullptr) {
+ msg = req->GetReturn()->GetValue(1)._string._str;
+ } else {
+ msg = vespalib::make_string("Clientside error %s: error(%d): %s", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage());
+ }
+ req->SubRef();
+ throw std::runtime_error(vespalib::make_string("commit failed with code %d. server says: %s", retcode, msg.c_str()));
+ }
+ }
+ return retval;
+}
+
+bool
+Session::status(SerialNum & b, SerialNum & e, size_t & count)
+{
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
+ req->SetMethodName("domainStatus");
+ req->GetParams()->AddString(_domain.c_str());
+ int32_t retval(_tlc.rpc(req));
+ if (retval == 0) {
+ b = req->GetReturn()->GetValue(1)._intval64;
+ e = req->GetReturn()->GetValue(2)._intval64;
+ count = req->GetReturn()->GetValue(3)._intval64;
+ }
+ req->SubRef();
+ return (retval == 0);
+}
+
+bool
+Session::erase(const SerialNum & to)
+{
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
+ req->SetMethodName("domainPrune");
+ req->GetParams()->AddString(_domain.c_str());
+ req->GetParams()->AddInt64(to);
+ int32_t retval(_tlc.rpc(req));
+ req->SubRef();
+ if (retval == 1) {
+ LOG(warning, "Prune to %" PRIu64 " denied since there were active visitors in that area", to);
+ }
+ return (retval == 0);
+}
+
+
+bool
+Session::sync(const SerialNum &syncTo, SerialNum &syncedTo)
+{
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
+ req->SetMethodName("domainSync");
+ FRT_Values & params = *req->GetParams();
+ params.AddString(_domain.c_str());
+ params.AddInt64(syncTo);
+ int32_t retval(_tlc.rpc(req));
+ if (retval == 0) {
+ syncedTo = req->GetReturn()->GetValue(1)._intval64;
+ }
+ req->SubRef();
+ return (retval == 0);
+}
+
+
+void
+Session::clear()
+{
+ if (_sessionId > 0) {
+ LockGuard guard(_tlc._lock);
+ _tlc._sessions.erase(SessionKey(_domain, _sessionId));
+ }
+ _sessionId = 0;
+}
+
+Visitor::Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack) :
+ Session(domain, tlc),
+ _callback(callBack)
+{
+}
+
+bool
+Session::init(FRT_RPCRequest *req)
+{
+ int32_t retval(_tlc.rpc(req));
+ req->SubRef();
+ if (retval > 0) {
+ clear();
+ _sessionId = retval;
+ SessionKey key(_domain, _sessionId);
+ {
+ LockGuard guard(_tlc._lock);
+ _tlc._sessions[key] = this;
+ }
+ retval = run();
+ }
+ return (retval > 0);
+}
+
+bool
+Visitor::visit(const SerialNum & from, const SerialNum & to)
+{
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
+ req->SetMethodName("domainVisit");
+ req->GetParams()->AddString(_domain.c_str());
+ req->GetParams()->AddInt64(from);
+ req->GetParams()->AddInt64(to);
+ return init(req);
+}
+
+bool
+Session::run()
+{
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
+ req->SetMethodName("domainSessionRun");
+ req->GetParams()->AddString(_domain.c_str());
+ req->GetParams()->AddInt32(_sessionId);
+ int32_t retval(_tlc.rpc(req));
+ req->SubRef();
+ return (retval == 0);
+}
+
+bool
+Session::close()
+{
+ int retval(0);
+ if (_sessionId > 0) {
+ do {
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
+ req->SetMethodName("domainSessionClose");
+ req->GetParams()->AddString(_domain.c_str());
+ req->GetParams()->AddInt32(_sessionId);
+ if ( (retval = _tlc.rpc(req)) > 0) {
+ std::this_thread::sleep_for(10ms);
+ }
+ req->SubRef();
+ } while ( retval == 1 );
+ }
+ return (retval == 0);
+}
+
+Visitor::~Visitor() = default;
+
+}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/client_session.h b/searchlib/src/vespa/searchlib/transactionlog/client_session.h
new file mode 100644
index 00000000000..49f24d83aaf
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/transactionlog/client_session.h
@@ -0,0 +1,68 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "client_common.h"
+#include <vespa/searchlib/common/serialnum.h>
+#include <vespa/vespalib/util/buffer.h>
+#include <vespa/vespalib/stllike/string.h>
+
+class FRT_RPCRequest;
+
+namespace search::transactionlog::client {
+
+class TransLogClient;
+
+class SessionKey
+{
+public:
+ SessionKey(const vespalib::string & domain, int sessionId);
+ ~SessionKey();
+ bool operator < (const SessionKey & b) const { return cmp(b) < 0; }
+private:
+ int cmp(const SessionKey & b) const;
+ vespalib::string _domain;
+ int _sessionId;
+};
+
+class Session
+{
+public:
+ Session(const vespalib::string & domain, TransLogClient & tlc);
+ virtual ~Session();
+ /// You can commit data of any registered type to any channel.
+ bool commit(const vespalib::ConstBufferRef & packet);
+ /// Will erase all entries prior to <to>
+ bool erase(const SerialNum & to);
+ bool status(SerialNum & b, SerialNum & e, size_t & count);
+
+ bool sync(const SerialNum &syncTo, SerialNum &syncedTo);
+
+ virtual RPC::Result visit(const Packet & ) { return RPC::OK; }
+ virtual void eof() { }
+ bool close();
+ void clear();
+ const vespalib::string & getDomain() const { return _domain; }
+ const TransLogClient & getTLC() const { return _tlc; }
+protected:
+ bool init(FRT_RPCRequest * req);
+ bool run();
+ TransLogClient & _tlc;
+ vespalib::string _domain;
+ int _sessionId;
+};
+
+/// Here you connect to the incomming data getting everything from <from>
+class Visitor : public Session
+{
+public:
+ Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack);
+ bool visit(const SerialNum & from, const SerialNum & to);
+ virtual ~Visitor() override;
+ RPC::Result visit(const Packet & packet) override { return _callback.receive(packet); }
+ void eof() override { _callback.eof(); }
+private:
+ Callback & _callback;
+};
+
+}
+
diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h
index c5427c5b401..8dba5d448f8 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/common.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/common.h
@@ -10,14 +10,7 @@
namespace search::transactionlog {
/// This represents a type of the entry. Fx update,remove
-typedef uint32_t Type;
-/// A channel represents one data stream.
-
-class RPC
-{
-public:
-enum Result { OK, FULL, ERROR };
-};
+using Type = uint32_t;
class SerialNumRange
{
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
index 2c6c1e249f4..84919a59a97 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
@@ -1,12 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "translogclient.h"
+#include "common.h"
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fnet/frt/target.h>
#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/fnet/transport.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/fastos/thread.h>
-#include <thread>
+
#include <vespa/log/log.h>
LOG_SETUP(".translogclient");
@@ -15,7 +17,7 @@ using namespace std::chrono_literals;
VESPA_THREAD_STACK_TAG(translogclient_rpc_callback)
-namespace search::transactionlog {
+namespace search::transactionlog::client {
namespace {
const double NEVER(-1.0);
@@ -33,7 +35,7 @@ struct RpcTask : public vespalib::Executor::Task {
req->Return();
req = nullptr;
}
- ~RpcTask() {
+ ~RpcTask() override {
if (req != nullptr) {
req->SetError(FRTE_RPC_METHOD_FAILED, "client has been shut down");
req->Return();
@@ -46,13 +48,13 @@ struct RpcTask : public vespalib::Executor::Task {
using vespalib::LockGuard;
TransLogClient::TransLogClient(const vespalib::string & rpcTarget) :
- _executor(1, 128 * 1024, translogclient_rpc_callback),
+ _executor(std::make_unique<vespalib::ThreadStackExecutor>(1, 128 * 1024, translogclient_rpc_callback)),
_rpcTarget(rpcTarget),
_sessions(),
_threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)),
_transport(std::make_unique<FNET_Transport>()),
_supervisor(std::make_unique<FRT_Supervisor>(_transport.get())),
- _target(NULL)
+ _target(nullptr)
{
reconnect();
exportRPC(*_supervisor);
@@ -62,29 +64,33 @@ TransLogClient::TransLogClient(const vespalib::string & rpcTarget) :
TransLogClient::~TransLogClient()
{
disconnect();
- _executor.shutdown().sync();
+ _executor->shutdown().sync();
_transport->ShutDown(true);
}
-bool TransLogClient::reconnect()
+bool
+TransLogClient::reconnect()
{
disconnect();
_target = _supervisor->Get2WayTarget(_rpcTarget.c_str());
return isConnected();
}
-bool TransLogClient::isConnected() const {
- return (_target != NULL) && _target->IsValid();
+bool
+TransLogClient::isConnected() const {
+ return (_target != nullptr) && _target->IsValid();
}
-void TransLogClient::disconnect()
+void
+TransLogClient::disconnect()
{
if (_target) {
_target->SubRef();
}
}
-bool TransLogClient::create(const vespalib::string & domain)
+bool
+TransLogClient::create(const vespalib::string & domain)
{
FRT_RPCRequest *req = _supervisor->AllocRPCRequest();
req->SetMethodName("createDomain");
@@ -94,7 +100,8 @@ bool TransLogClient::create(const vespalib::string & domain)
return (retval == 0);
}
-bool TransLogClient::remove(const vespalib::string & domain)
+bool
+TransLogClient::remove(const vespalib::string & domain)
{
FRT_RPCRequest *req = _supervisor->AllocRPCRequest();
req->SetMethodName("deleteDomain");
@@ -104,27 +111,28 @@ bool TransLogClient::remove(const vespalib::string & domain)
return (retval == 0);
}
-TransLogClient::Session::UP TransLogClient::open(const vespalib::string & domain)
+std::unique_ptr<Session>
+TransLogClient::open(const vespalib::string & domain)
{
- Session::UP session;
FRT_RPCRequest *req = _supervisor->AllocRPCRequest();
req->SetMethodName("openDomain");
req->GetParams()->AddString(domain.c_str());
int32_t retval(rpc(req));
+ req->SubRef();
if (retval == 0) {
- session.reset(new Session(domain, *this));
+ return std::make_unique<Session>(domain, *this);
}
- req->SubRef();
- return session;
+ return std::unique_ptr<Session>();
}
-TransLogClient::Visitor::UP
-TransLogClient::createVisitor(const vespalib::string & domain, TransLogClient::Session::Callback & callBack)
+std::unique_ptr<Visitor>
+TransLogClient::createVisitor(const vespalib::string & domain, Callback & callBack)
{
- return TransLogClient::Visitor::UP(new Visitor(domain, *this, callBack));
+ return std::make_unique<Visitor>(domain, *this, callBack);
}
-bool TransLogClient::listDomains(std::vector<vespalib::string> & dir)
+bool
+TransLogClient::listDomains(std::vector<vespalib::string> & dir)
{
FRT_RPCRequest *req = _supervisor->AllocRPCRequest();
req->SetMethodName("listDomains");
@@ -139,7 +147,8 @@ bool TransLogClient::listDomains(std::vector<vespalib::string> & dir)
return (retval == 0);
}
-int32_t TransLogClient::rpc(FRT_RPCRequest * req)
+int32_t
+TransLogClient::rpc(FRT_RPCRequest * req)
{
int32_t retval(-7);
if (_target) {
@@ -156,15 +165,17 @@ int32_t TransLogClient::rpc(FRT_RPCRequest * req)
return retval;
}
-TransLogClient::Session * TransLogClient::findSession(const vespalib::string & domainName, int sessionId)
+Session *
+TransLogClient::findSession(const vespalib::string & domainName, int sessionId)
{
SessionKey key(domainName, sessionId);
SessionMap::iterator found(_sessions.find(key));
- Session * session((found != _sessions.end()) ? found->second : NULL);
+ Session * session((found != _sessions.end()) ? found->second : nullptr);
return session;
}
-void TransLogClient::exportRPC(FRT_Supervisor & supervisor)
+void
+TransLogClient::exportRPC(FRT_Supervisor & supervisor)
{
FRT_ReflectionBuilder rb( & supervisor);
@@ -185,7 +196,8 @@ void TransLogClient::exportRPC(FRT_Supervisor & supervisor)
}
-void TransLogClient::do_visitCallbackRPC(FRT_RPCRequest *req)
+void
+TransLogClient::do_visitCallbackRPC(FRT_RPCRequest *req)
{
uint32_t retval(uint32_t(-1));
FRT_Values & params = *req->GetParams();
@@ -194,7 +206,7 @@ void TransLogClient::do_visitCallbackRPC(FRT_RPCRequest *req)
int32_t sessionId(params[1]._intval32);
LOG(spam, "visitCallback(%s, %d)(%d)", domainName, sessionId, params[2]._data._len);
Session * session(findSession(domainName, sessionId));
- if (session != NULL) {
+ if (session != nullptr) {
Packet packet(params[2]._data._buf, params[2]._data._len);
retval = session->visit(packet);
}
@@ -202,7 +214,8 @@ void TransLogClient::do_visitCallbackRPC(FRT_RPCRequest *req)
LOG(debug, "visitCallback(%s, %d)=%d done", domainName, sessionId, retval);
}
-void TransLogClient::do_eofCallbackRPC(FRT_RPCRequest *req)
+void
+TransLogClient::do_eofCallbackRPC(FRT_RPCRequest *req)
{
uint32_t retval(uint32_t(-1));
FRT_Values & params = *req->GetParams();
@@ -211,7 +224,7 @@ void TransLogClient::do_eofCallbackRPC(FRT_RPCRequest *req)
int32_t sessionId(params[1]._intval32);
LOG(debug, "eofCallback(%s, %d)", domainName, sessionId);
Session * session(findSession(domainName, sessionId));
- if (session != NULL) {
+ if (session != nullptr) {
session->eof();
retval = 0;
}
@@ -219,183 +232,16 @@ void TransLogClient::do_eofCallbackRPC(FRT_RPCRequest *req)
LOG(debug, "eofCallback(%s, %d)=%d done", domainName, sessionId, retval);
}
-void TransLogClient::visitCallbackRPC_hook(FRT_RPCRequest *req)
-{
- _executor.execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_visitCallbackRPC(x); }));
-}
-
-void TransLogClient::eofCallbackRPC_hook(FRT_RPCRequest *req)
-{
- _executor.execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_eofCallbackRPC(x); }));
-}
-
-
-TransLogClient::Session::Session(const vespalib::string & domain, TransLogClient & tlc) :
- _tlc(tlc),
- _domain(domain),
- _sessionId(0)
-{
-}
-
-TransLogClient::Session::~Session()
-{
- close();
- clear();
-}
-
-bool TransLogClient::Session::commit(const vespalib::ConstBufferRef & buf)
-{
- bool retval(true);
- if (buf.size() != 0) {
- FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
- req->SetMethodName("domainCommit");
- req->GetParams()->AddString(_domain.c_str());
- req->GetParams()->AddData(buf.c_str(), buf.size());
- int retcode = _tlc.rpc(req);
- retval = (retcode == 0);
- if (retval) {
- req->SubRef();
- } else {
- vespalib::string msg;
- if (req->GetReturn() != 0) {
- msg = req->GetReturn()->GetValue(1)._string._str;
- } else {
- msg = vespalib::make_string("Clientside error %s: error(%d): %s", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage());
- }
- req->SubRef();
- throw std::runtime_error(vespalib::make_string("commit failed with code %d. server says: %s", retcode, msg.c_str()));
- }
- }
- return retval;
-}
-
-bool TransLogClient::Session::status(SerialNum & b, SerialNum & e, size_t & count)
-{
- FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
- req->SetMethodName("domainStatus");
- req->GetParams()->AddString(_domain.c_str());
- int32_t retval(_tlc.rpc(req));
- if (retval == 0) {
- b = req->GetReturn()->GetValue(1)._intval64;
- e = req->GetReturn()->GetValue(2)._intval64;
- count = req->GetReturn()->GetValue(3)._intval64;
- }
- req->SubRef();
- return (retval == 0);
-}
-
-bool TransLogClient::Session::erase(const SerialNum & to)
-{
- FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
- req->SetMethodName("domainPrune");
- req->GetParams()->AddString(_domain.c_str());
- req->GetParams()->AddInt64(to);
- int32_t retval(_tlc.rpc(req));
- req->SubRef();
- if (retval == 1) {
- LOG(warning, "Prune to %" PRIu64 " denied since there were active visitors in that area", to);
- }
- return (retval == 0);
-}
-
-
-bool
-TransLogClient::Session::sync(const SerialNum &syncTo, SerialNum &syncedTo)
-{
- FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
- req->SetMethodName("domainSync");
- FRT_Values & params = *req->GetParams();
- params.AddString(_domain.c_str());
- params.AddInt64(syncTo);
- int32_t retval(_tlc.rpc(req));
- if (retval == 0) {
- syncedTo = req->GetReturn()->GetValue(1)._intval64;
- }
- req->SubRef();
- return (retval == 0);
-}
-
-
-void TransLogClient::Session::clear()
-{
- if (_sessionId > 0) {
- LockGuard guard(_tlc._lock);
- _tlc._sessions.erase(SessionKey(_domain, _sessionId));
- }
- _sessionId = 0;
-}
-
-int TransLogClient::SessionKey::cmp(const TransLogClient::SessionKey & b) const
+void
+TransLogClient::visitCallbackRPC_hook(FRT_RPCRequest *req)
{
- int diff(strcmp(_domain.c_str(), b._domain.c_str()));
- if (diff == 0) {
- diff = _sessionId - b._sessionId;
- }
- return diff;
+ _executor->execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_visitCallbackRPC(x); }));
}
-TransLogClient::Visitor::Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack) :
- Session(domain, tlc),
- _callback(callBack)
+void
+TransLogClient::eofCallbackRPC_hook(FRT_RPCRequest *req)
{
+ _executor->execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_eofCallbackRPC(x); }));
}
-bool TransLogClient::Session::init(FRT_RPCRequest *req)
-{
- int32_t retval(_tlc.rpc(req));
- req->SubRef();
- if (retval > 0) {
- clear();
- _sessionId = retval;
- SessionKey key(_domain, _sessionId);
- {
- LockGuard guard(_tlc._lock);
- _tlc._sessions[key] = this;
- }
- retval = run();
- }
- return (retval > 0);
-}
-
-bool TransLogClient::Visitor::visit(const SerialNum & from, const SerialNum & to)
-{
- FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
- req->SetMethodName("domainVisit");
- req->GetParams()->AddString(_domain.c_str());
- req->GetParams()->AddInt64(from);
- req->GetParams()->AddInt64(to);
- return init(req);
-}
-
-bool TransLogClient::Session::run()
-{
- FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
- req->SetMethodName("domainSessionRun");
- req->GetParams()->AddString(_domain.c_str());
- req->GetParams()->AddInt32(_sessionId);
- int32_t retval(_tlc.rpc(req));
- req->SubRef();
- return (retval == 0);
-}
-
-bool TransLogClient::Session::close()
-{
- int retval(0);
- if (_sessionId > 0) {
- do {
- FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
- req->SetMethodName("domainSessionClose");
- req->GetParams()->AddString(_domain.c_str());
- req->GetParams()->AddInt32(_sessionId);
- if ( (retval = _tlc.rpc(req)) > 0) {
- std::this_thread::sleep_for(10ms);
- }
- req->SubRef();
- } while ( retval == 1 );
- }
- return (retval == 0);
-}
-
-TransLogClient::Visitor::~Visitor() = default;
-
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h
index 38c30cd5b4c..289a0fcb8c0 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h
@@ -1,11 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "common.h"
-#include <vespa/document/util/bytebuffer.h>
+#include "client_common.h"
+#include "client_session.h"
#include <vespa/vespalib/util/sync.h>
-#include <vespa/vespalib/util/buffer.h>
-#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/fnet/frt/invokable.h>
#include <map>
#include <vector>
@@ -13,90 +11,39 @@
class FNET_Transport;
class FRT_Supervisor;
class FRT_Target;
+class FastOS_ThreadPool;
-namespace search::transactionlog {
+namespace vespalib { class ThreadStackExecutorBase; }
+namespace search::transactionlog::client {
+
+class Session;
+class Visitor;
class TransLogClient : private FRT_Invokable
{
-private:
- TransLogClient(const TransLogClient &);
- TransLogClient& operator=(const TransLogClient &);
-
public:
- class Session
- {
- public:
- class Callback {
- public:
- virtual ~Callback() { }
- virtual RPC::Result receive(const Packet & packet) = 0;
- virtual void eof() { }
- };
- public:
- typedef std::unique_ptr<Session> UP;
- typedef std::shared_ptr<Session> SP;
-
- Session(const vespalib::string & domain, TransLogClient & tlc);
- virtual ~Session();
- /// You can commit data of any registered type to any channel.
- bool commit(const vespalib::ConstBufferRef & packet);
- /// Will erase all entries prior to <to>
- bool erase(const SerialNum & to);
- bool status(SerialNum & b, SerialNum & e, size_t & count);
-
- bool sync(const SerialNum &syncTo, SerialNum &syncedTo);
-
- virtual RPC::Result visit(const Packet & ) { return RPC::OK; }
- virtual void eof() { }
- bool close();
- void clear();
- const vespalib::string & getDomain() const { return _domain; }
- const TransLogClient & getTLC() const { return _tlc; }
- protected:
- bool init(FRT_RPCRequest * req);
- bool run();
- TransLogClient & _tlc;
- vespalib::string _domain;
- int _sessionId;
- };
- /// Here you connect to the incomming data getting everything from <from>
- class Visitor : public Session
- {
- public:
- typedef std::unique_ptr<Visitor> UP;
- typedef std::shared_ptr<Visitor> SP;
-
- Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack);
- bool visit(const SerialNum & from, const SerialNum & to);
- virtual ~Visitor();
- RPC::Result visit(const Packet & packet) override { return _callback.receive(packet); }
- void eof() override { _callback.eof(); }
- private:
- Callback & _callback;
- };
- /// Here you read the incomming data getting everything from <from>
-
-public:
- typedef std::unique_ptr<TransLogClient> UP;
-
TransLogClient(const vespalib::string & rpctarget);
- virtual ~TransLogClient();
+ TransLogClient(const TransLogClient &) = delete;
+ TransLogClient& operator=(const TransLogClient &) = delete;
+ ~TransLogClient() override;
/// Here you create a new domain
bool create(const vespalib::string & domain);
/// Here you remove a domain
bool remove(const vespalib::string & domain);
/// Here you open an existing domain
- Session::UP open(const vespalib::string & domain);
+ std::unique_ptr<Session> open(const vespalib::string & domain);
/// Here you can get a list of available domains.
bool listDomains(std::vector<vespalib::string> & dir);
- Visitor::UP createVisitor(const vespalib::string & domain, Session::Callback & callBack);
+ std::unique_ptr<Visitor> createVisitor(const vespalib::string & domain, Callback & callBack);
bool isConnected() const;
void disconnect();
bool reconnect();
const vespalib::string &getRPCTarget() const { return _rpcTarget; }
private:
+ friend Session;
+ friend Visitor;
void exportRPC(FRT_Supervisor & supervisor);
void do_visitCallbackRPC(FRT_RPCRequest *req);
void do_eofCallbackRPC(FRT_RPCRequest *req);
@@ -105,22 +52,11 @@ private:
int32_t rpc(FRT_RPCRequest * req);
Session * findSession(const vespalib::string & domain, int sessionId);
- class SessionKey
- {
- public:
- SessionKey(const vespalib::string & domain, int sessionId) : _domain(domain), _sessionId(sessionId) { }
- bool operator < (const SessionKey & b) const { return cmp(b) < 0; }
- private:
- int cmp(const SessionKey & b) const;
- vespalib::string _domain;
- int _sessionId;
- };
-
- typedef std::map< SessionKey, Session * > SessionMap;
+ using SessionMap = std::map< SessionKey, Session * >;
- vespalib::ThreadStackExecutor _executor;
- vespalib::string _rpcTarget;
- SessionMap _sessions;
+ std::unique_ptr<vespalib::ThreadStackExecutorBase> _executor;
+ vespalib::string _rpcTarget;
+ SessionMap _sessions;
//Brute force lock for subscriptions. For multithread safety.
vespalib::Lock _lock;
std::unique_ptr<FastOS_ThreadPool> _threadPool;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index a72731b661e..64e472520a5 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "translogserver.h"
#include "domain.h"
+#include "client_common.h"
#include <vespa/searchlib/common/gatecallback.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/io/fileutil.h>
@@ -399,12 +400,12 @@ public:
private:
bool send(FRT_RPCRequest * req) {
int32_t retval = rpc(req);
- if ( ! ((retval == RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) {
+ if ( ! ((retval == client::RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) {
LOG(error, "Return value != OK(%d) in send for method 'visitCallback'.", retval);
}
req->SubRef();
- return (retval == RPC::OK);
+ return (retval == client::RPC::OK);
}
int32_t rpc(FRT_RPCRequest * req) {
int32_t retval(-7);