summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-09-15 11:32:16 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-09-15 11:33:09 +0000
commitaf3f5cd865a24507271c1525c61fc97119a1ce83 (patch)
treee20169f10c1a2c1ecd6e655e309f8d9db403c58a /searchcore
parentf500f24745747137bad942c0c8c40db56be6c49e (diff)
Decouple code and reduce code visibility.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp16
-rw-r--r--searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstates.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/packetwrapper.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp17
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp44
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h27
11 files changed, 74 insertions, 56 deletions
diff --git a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp
index 2f34292ad52..a7c53293981 100644
--- a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp
+++ b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp
@@ -286,7 +286,7 @@ public:
* and dispatches each packet entry to the ReplayPacketDispatcher that
* transforms them into concrete operations.
*/
-class VisitorCallback : public TransLogClient::Session::Callback
+class VisitorCallback : public client::Callback
{
private:
ReplayPacketDispatcher _dispatcher;
@@ -298,7 +298,7 @@ public:
_eof(false)
{
}
- virtual RPC::Result receive(const Packet &packet) override {
+ client::RPC::Result receive(const Packet &packet) override {
vespalib::nbostream_longlivedbuf handle(packet.getHandle().data(), packet.getHandle().size());
try {
while (handle.size() > 0) {
@@ -309,11 +309,11 @@ public:
} catch (const std::exception &e) {
std::cerr << "Error while handling transaction log packet: '"
<< std::string(e.what()) << "'" << std::endl;
- return RPC::ERROR;
+ return client::RPC::ERROR;
}
- return RPC::OK;
+ return client::RPC::OK;
}
- virtual void eof() override { _eof = true; }
+ void eof() override { _eof = true; }
bool isEof() const { return _eof; }
};
@@ -371,7 +371,7 @@ protected:
const BaseOptions &_bopts;
DummyFileHeaderContext _fileHeader;
TransLogServer _server;
- TransLogClient _client;
+ client::TransLogClient _client;
public:
BaseUtility(const BaseOptions &bopts)
@@ -416,7 +416,7 @@ public:
_client.listDomains(domains);
std::cout << "Listing status for " << domains.size() << " domain(s):" << std::endl;
for (size_t i = 0; i < domains.size(); ++i) {
- TransLogClient::Session::UP session = _client.open(domains[i]);
+ std::unique_ptr<client::Session> session = _client.open(domains[i]);
SerialNum first;
SerialNum last;
size_t count;
@@ -484,7 +484,7 @@ protected:
DocTypeRepo repo(_oopts.configDir);
IReplayPacketHandlerUP handler = createHandler(repo.docTypeRepo);
VisitorCallback callback(*handler);
- TransLogClient::Visitor::UP visitor = _client.createVisitor(_oopts.domainName, callback);
+ std::unique_ptr<client::Visitor> visitor = _client.createVisitor(_oopts.domainName, callback);
bool visitOk = visitor->visit(_oopts.firstSerialNum-1, _oopts.lastSerialNum);
if (!visitOk) {
std::cerr << "Visiting domain '" << _oopts.domainName << "' [" << _oopts.firstSerialNum << ","
diff --git a/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp b/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp
index edb4250ce76..2df7c5d629d 100644
--- a/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp
+++ b/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp
@@ -11,7 +11,7 @@
#include <vespa/vespalib/util/stringfmt.h>
using search::index::DummyFileHeaderContext;
-using search::transactionlog::TransLogClient;
+using search::transactionlog::client::TransLogClient;
using search::transactionlog::TransLogServer;
using proton::DocTypeName;
using proton::ProtonDiskLayout;
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index a1afd7a0bb8..8a117f112b4 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -19,6 +19,7 @@
#include <vespa/searchcore/proton/common/eventlogger.h>
#include <vespa/searchcorespi/index/ithreadingservice.h>
#include <vespa/searchlib/common/gatecallback.h>
+#include <vespa/searchlib/transactionlog/client_session.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <unistd.h>
@@ -331,7 +332,7 @@ FeedHandler::FeedHandler(IThreadingService &writeService,
IReplayConfig &replayConfig,
search::transactionlog::Writer & tlsDirectWriter,
TlsWriter * tlsWriter)
- : search::transactionlog::TransLogClient::Session::Callback(),
+ : search::transactionlog::client::Callback(),
IDocumentMoveHandler(),
IPruneRemovedDocumentsHandler(),
IHeartBeatHandler(),
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index b7fc2733f49..d70cefb288b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -13,7 +13,7 @@
#include <vespa/document/bucket/bucketid.h>
#include <vespa/searchcore/proton/common/doctypename.h>
#include <vespa/searchcore/proton/common/feedtoken.h>
-#include <vespa/searchlib/transactionlog/translogclient.h>
+#include <vespa/searchlib/transactionlog/client_common.h>
#include <shared_mutex>
namespace searchcorespi::index { struct IThreadingService; }
@@ -43,7 +43,7 @@ namespace bucketdb { class IBucketDBHandler; }
* Class handling all aspects of feeding for a document database.
* In addition to regular feeding this also includes handling the transaction log.
*/
-class FeedHandler: private search::transactionlog::TransLogClient::Session::Callback,
+class FeedHandler: private search::transactionlog::client::Callback,
public IDocumentMoveHandler,
public IPruneRemovedDocumentsHandler,
public IHeartBeatHandler,
@@ -52,7 +52,7 @@ class FeedHandler: private search::transactionlog::TransLogClient::Session::Call
{
private:
using Packet = search::transactionlog::Packet;
- using RPC = search::transactionlog::RPC;
+ using RPC = search::transactionlog::client::RPC;
using SerialNum = search::SerialNum;
using Timestamp = storage::spi::Timestamp;
using BucketId = document::BucketId;
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
index d01c25d9c1e..5214e13fe79 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp
@@ -16,7 +16,7 @@
LOG_SETUP(".proton.server.feedstates");
using search::transactionlog::Packet;
-using search::transactionlog::RPC;
+using search::transactionlog::client::RPC;
using search::SerialNum;
using vespalib::Executor;
using vespalib::makeClosure;
diff --git a/searchcore/src/vespa/searchcore/proton/server/packetwrapper.h b/searchcore/src/vespa/searchcore/proton/server/packetwrapper.h
index 6224b3b693a..c36652ec847 100644
--- a/searchcore/src/vespa/searchcore/proton/server/packetwrapper.h
+++ b/searchcore/src/vespa/searchcore/proton/server/packetwrapper.h
@@ -4,6 +4,7 @@
#include "tls_replay_progress.h"
#include <vespa/searchlib/transactionlog/common.h>
+#include <vespa/searchlib/transactionlog/client_common.h>
#include <vespa/vespalib/util/gate.h>
namespace proton {
@@ -16,14 +17,14 @@ struct PacketWrapper {
const search::transactionlog::Packet &packet;
TlsReplayProgress *progress;
- search::transactionlog::RPC::Result result;
+ search::transactionlog::client::RPC::Result result;
vespalib::Gate gate;
PacketWrapper(const search::transactionlog::Packet &p,
TlsReplayProgress *progress_)
: packet(p),
progress(progress_),
- result(search::transactionlog::RPC::ERROR),
+ result(search::transactionlog::client::RPC::ERROR),
gate()
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp
index 31fd44eec5e..23289296ada 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp
@@ -2,15 +2,13 @@
#include "proton_disk_layout.h"
#include <vespa/vespalib/io/fileutil.h>
-#include <vespa/fastos/file.h>
#include <vespa/searchcore/proton/common/doctypename.h>
#include <vespa/searchlib/transactionlog/translogclient.h>
-#include <cassert>
#include <vespa/log/log.h>
LOG_SETUP(".proton.server.proton_disk_layout");
-using search::transactionlog::TransLogClient;
+using search::transactionlog::client::TransLogClient;
namespace proton {
diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp
index 3ad98cba3ac..fdc9b6d7807 100644
--- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp
@@ -2,6 +2,7 @@
#include "configstore.h"
#include "transactionlogmanager.h"
+#include <vespa/searchlib/transactionlog/translogclient.h>
#include <vespa/searchcore/proton/common/eventlogger.h>
#include <vespa/vespalib/util/closuretask.h>
#include <vespa/vespalib/util/exceptions.h>
@@ -11,11 +12,11 @@ LOG_SETUP(".proton.server.transactionlogmanager");
using vespalib::IllegalStateException;
using vespalib::make_string;
-using search::transactionlog::TransLogClient;
+using search::transactionlog::client::TransLogClient;
+using search::transactionlog::client::Session;
namespace proton {
-
void
TransactionLogManager::doLogReplayComplete(const vespalib::string &domainName,
vespalib::duration elapsedTime) const
@@ -45,10 +46,8 @@ TransactionLogManager::init(SerialNum oldestConfigSerial, SerialNum &prunedSeria
namespace {
-void getStatus(TransLogClient::Session & session,
- search::SerialNum & serialBegin,
- search::SerialNum & serialEnd,
- size_t & count)
+void
+getStatus(Session & session, search::SerialNum & serialBegin, search::SerialNum & serialEnd, size_t & count)
{
if (!session.status(serialBegin, serialEnd, count)) {
throw IllegalStateException(
@@ -66,7 +65,7 @@ void getStatus(TransLogClient & client,
search::SerialNum & serialEnd,
size_t & count)
{
- TransLogClient::Session::UP session = client.open(domainName);
+ std::unique_ptr<Session> session = client.open(domainName);
if ( ! session) {
throw IllegalStateException(
make_string(
@@ -117,7 +116,7 @@ TransactionLogManager::prepareReplay(TransLogClient &client,
TlsReplayProgress::UP
TransactionLogManager::startReplay(SerialNum first,
SerialNum syncToken,
- TransLogClient::Session::Callback &callback)
+ Callback &callback)
{
assert( !_visitor);
_visitor = createTlcVisitor(callback);
@@ -142,7 +141,7 @@ TransactionLogManager::startReplay(SerialNum first,
getDomainName().c_str(),
first, syncToken, getRpcTarget().c_str()));
}
- return TlsReplayProgress::UP(new TlsReplayProgress(getDomainName(), first, syncToken));
+ return std::make_unique<TlsReplayProgress>(getDomainName(), first, syncToken);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h
index 58444351e8f..32532e3f656 100644
--- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h
+++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h
@@ -13,7 +13,7 @@ struct ConfigStore;
**/
class TransactionLogManager : public TransactionLogManagerBase
{
- TransLogClient::Visitor::UP _visitor;
+ std::unique_ptr<Visitor> _visitor;
void doLogReplayComplete(const vespalib::string &domainName, vespalib::duration elapsedTime) const override;
@@ -51,7 +51,7 @@ public:
/**
* Start replay of the transaction log.
**/
- TlsReplayProgress::UP startReplay(SerialNum first, SerialNum syncToken, TransLogClient::Session::Callback &callback);
+ TlsReplayProgress::UP startReplay(SerialNum first, SerialNum syncToken, Callback &callback);
/**
* Indicate that replay is done.
diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp
index 8b18a7ae566..a8ecb2ba07b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp
@@ -1,19 +1,20 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "transactionlogmanagerbase.h"
+#include <vespa/searchlib/transactionlog/translogclient.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/log/log.h>
LOG_SETUP(".proton.server.transactionlogmanagerbase");
-using search::transactionlog::TransLogClient;
+using search::transactionlog::client::Visitor;
namespace proton {
TransactionLogManagerBase::TransactionLogManagerBase(
const vespalib::string &tlsSpec, const vespalib::string &domainName) :
- _tlc(tlsSpec),
+ _tlc(std::make_unique<TransLogClient>(tlsSpec)),
_tlcSession(),
_domainName(domainName),
_replayLock(),
@@ -29,31 +30,31 @@ TransactionLogManagerBase::~TransactionLogManagerBase() = default;
TransactionLogManagerBase::StatusResult
TransactionLogManagerBase::init()
{
- TransLogClient::Session::UP session = _tlc.open(_domainName);
+ std::unique_ptr<Session> session = _tlc->open(_domainName);
if ( ! session) {
- if (!_tlc.create(_domainName)) {
+ if (!_tlc->create(_domainName)) {
vespalib::string str = vespalib::make_string(
"Failed creating domain '%s' on TLS '%s'",
- _domainName.c_str(), _tlc.getRPCTarget().c_str());
+ _domainName.c_str(), _tlc->getRPCTarget().c_str());
throw std::runtime_error(str);
}
LOG(debug, "Created domain '%s' on TLS '%s'",
- _domainName.c_str(), _tlc.getRPCTarget().c_str());
- session = _tlc.open(_domainName);
+ _domainName.c_str(), _tlc->getRPCTarget().c_str());
+ session = _tlc->open(_domainName);
if ( ! session) {
vespalib::string str = vespalib::make_string(
"Could not open session for domain '%s' on TLS '%s'",
- _domainName.c_str(), _tlc.getRPCTarget().c_str());
+ _domainName.c_str(), _tlc->getRPCTarget().c_str());
throw std::runtime_error(str);
}
}
LOG(debug, "Opened domain '%s' on TLS '%s'",
- _domainName.c_str(), _tlc.getRPCTarget().c_str());
+ _domainName.c_str(), _tlc->getRPCTarget().c_str());
StatusResult res;
if (!session->status(res.serialBegin, res.serialEnd, res.count)) {
vespalib::string str = vespalib::make_string(
"Could not get status from session with domain '%s' on TLS '%s'",
- _domainName.c_str(), _tlc.getRPCTarget().c_str());
+ _domainName.c_str(), _tlc->getRPCTarget().c_str());
throw std::runtime_error(str);
}
LOG(debug,
@@ -72,7 +73,8 @@ TransactionLogManagerBase::internalStartReplay()
_replayStopWatch = vespalib::Timer();
}
-void TransactionLogManagerBase::changeReplayDone()
+void
+TransactionLogManagerBase::changeReplayDone()
{
std::lock_guard<std::mutex> guard(_replayLock);
_replayDone = true;
@@ -101,23 +103,31 @@ TransactionLogManagerBase::close()
}
}
-TransLogClient::Visitor::UP
-TransactionLogManagerBase::createTlcVisitor(TransLogClient::Session::Callback &callback) {
- return _tlc.createVisitor(_domainName, callback);
+std::unique_ptr<Visitor>
+TransactionLogManagerBase::createTlcVisitor(Callback &callback) {
+ return _tlc->createVisitor(_domainName, callback);
}
-bool TransactionLogManagerBase::getReplayDone() const {
+bool
+TransactionLogManagerBase::getReplayDone() const {
std::lock_guard<std::mutex> guard(_replayLock);
return _replayDone;
}
-bool TransactionLogManagerBase::isDoingReplay() const {
+bool
+TransactionLogManagerBase::isDoingReplay() const {
std::lock_guard<std::mutex> guard(_replayLock);
return _replayStarted && !_replayDone;
}
-void TransactionLogManagerBase::logReplayComplete() const {
+void
+TransactionLogManagerBase::logReplayComplete() const {
doLogReplayComplete(_domainName, _replayStopWatch.elapsed());
}
+const vespalib::string &
+TransactionLogManagerBase::getRpcTarget() const {
+ return _tlc->getRPCTarget();
+}
+
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h
index 4b5d001a28e..7059604dfe7 100644
--- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h
+++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h
@@ -2,11 +2,17 @@
#pragma once
-#include <vespa/searchlib/transactionlog/translogclient.h>
+#include <vespa/searchlib/common/serialnum.h>
#include <vespa/vespalib/util/time.h>
#include <mutex>
#include <condition_variable>
+namespace search::transactionlog::client {
+ class TransLogClient;
+ class Session;
+ class Visitor;
+ class Callback;
+}
namespace proton {
/**
@@ -14,10 +20,13 @@ namespace proton {
**/
class TransactionLogManagerBase {
protected:
- using TransLogClient = search::transactionlog::TransLogClient;
+ using TransLogClient = search::transactionlog::client::TransLogClient;
+ using Session = search::transactionlog::client::Session;
+ using Visitor = search::transactionlog::client::Visitor;
+ using Callback = search::transactionlog::client::Callback;
private:
- TransLogClient _tlc;
- TransLogClient::Session::UP _tlcSession;
+ std::unique_ptr<TransLogClient> _tlc;
+ std::unique_ptr<Session> _tlcSession;
vespalib::string _domainName;
mutable std::mutex _replayLock;
mutable std::condition_variable _replayCond;
@@ -26,7 +35,7 @@ private:
vespalib::Timer _replayStopWatch;
protected:
- typedef search::SerialNum SerialNum;
+ using SerialNum = search::SerialNum;
struct StatusResult {
SerialNum serialBegin;
@@ -55,17 +64,17 @@ public:
void changeReplayDone();
void close();
- TransLogClient::Visitor::UP createTlcVisitor(TransLogClient::Session::Callback &callback);
+ std::unique_ptr<Visitor> createTlcVisitor(Callback &callback);
void waitForReplayDone() const;
- TransLogClient &getClient() { return _tlc; }
- TransLogClient::Session *getSession() { return _tlcSession.get(); }
+ TransLogClient &getClient() { return *_tlc; }
+ Session *getSession() { return _tlcSession.get(); }
const vespalib::string &getDomainName() const { return _domainName; }
bool getReplayDone() const;
bool isDoingReplay() const;
void logReplayComplete() const;
- const vespalib::string &getRpcTarget() const { return _tlc.getRPCTarget(); }
+ const vespalib::string &getRpcTarget() const;
};
} // namespace proton