diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-15 11:32:16 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-09-15 11:33:09 +0000 |
commit | af3f5cd865a24507271c1525c61fc97119a1ce83 (patch) | |
tree | e20169f10c1a2c1ecd6e655e309f8d9db403c58a /searchcore | |
parent | f500f24745747137bad942c0c8c40db56be6c49e (diff) |
Decouple code and reduce code visibility.
Diffstat (limited to 'searchcore')
11 files changed, 74 insertions, 56 deletions
diff --git a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp index 2f34292ad52..a7c53293981 100644 --- a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp +++ b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp @@ -286,7 +286,7 @@ public: * and dispatches each packet entry to the ReplayPacketDispatcher that * transforms them into concrete operations. */ -class VisitorCallback : public TransLogClient::Session::Callback +class VisitorCallback : public client::Callback { private: ReplayPacketDispatcher _dispatcher; @@ -298,7 +298,7 @@ public: _eof(false) { } - virtual RPC::Result receive(const Packet &packet) override { + client::RPC::Result receive(const Packet &packet) override { vespalib::nbostream_longlivedbuf handle(packet.getHandle().data(), packet.getHandle().size()); try { while (handle.size() > 0) { @@ -309,11 +309,11 @@ public: } catch (const std::exception &e) { std::cerr << "Error while handling transaction log packet: '" << std::string(e.what()) << "'" << std::endl; - return RPC::ERROR; + return client::RPC::ERROR; } - return RPC::OK; + return client::RPC::OK; } - virtual void eof() override { _eof = true; } + void eof() override { _eof = true; } bool isEof() const { return _eof; } }; @@ -371,7 +371,7 @@ protected: const BaseOptions &_bopts; DummyFileHeaderContext _fileHeader; TransLogServer _server; - TransLogClient _client; + client::TransLogClient _client; public: BaseUtility(const BaseOptions &bopts) @@ -416,7 +416,7 @@ public: _client.listDomains(domains); std::cout << "Listing status for " << domains.size() << " domain(s):" << std::endl; for (size_t i = 0; i < domains.size(); ++i) { - TransLogClient::Session::UP session = _client.open(domains[i]); + std::unique_ptr<client::Session> session = _client.open(domains[i]); SerialNum first; SerialNum last; size_t count; @@ -484,7 +484,7 @@ protected: DocTypeRepo repo(_oopts.configDir); IReplayPacketHandlerUP handler = createHandler(repo.docTypeRepo); VisitorCallback callback(*handler); - TransLogClient::Visitor::UP visitor = _client.createVisitor(_oopts.domainName, callback); + std::unique_ptr<client::Visitor> visitor = _client.createVisitor(_oopts.domainName, callback); bool visitOk = visitor->visit(_oopts.firstSerialNum-1, _oopts.lastSerialNum); if (!visitOk) { std::cerr << "Visiting domain '" << _oopts.domainName << "' [" << _oopts.firstSerialNum << "," diff --git a/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp b/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp index edb4250ce76..2df7c5d629d 100644 --- a/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp +++ b/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp @@ -11,7 +11,7 @@ #include <vespa/vespalib/util/stringfmt.h> using search::index::DummyFileHeaderContext; -using search::transactionlog::TransLogClient; +using search::transactionlog::client::TransLogClient; using search::transactionlog::TransLogServer; using proton::DocTypeName; using proton::ProtonDiskLayout; diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index a1afd7a0bb8..8a117f112b4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -19,6 +19,7 @@ #include <vespa/searchcore/proton/common/eventlogger.h> #include <vespa/searchcorespi/index/ithreadingservice.h> #include <vespa/searchlib/common/gatecallback.h> +#include <vespa/searchlib/transactionlog/client_session.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/lambdatask.h> #include <unistd.h> @@ -331,7 +332,7 @@ FeedHandler::FeedHandler(IThreadingService &writeService, IReplayConfig &replayConfig, search::transactionlog::Writer & tlsDirectWriter, TlsWriter * tlsWriter) - : search::transactionlog::TransLogClient::Session::Callback(), + : search::transactionlog::client::Callback(), IDocumentMoveHandler(), IPruneRemovedDocumentsHandler(), IHeartBeatHandler(), diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index b7fc2733f49..d70cefb288b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -13,7 +13,7 @@ #include <vespa/document/bucket/bucketid.h> #include <vespa/searchcore/proton/common/doctypename.h> #include <vespa/searchcore/proton/common/feedtoken.h> -#include <vespa/searchlib/transactionlog/translogclient.h> +#include <vespa/searchlib/transactionlog/client_common.h> #include <shared_mutex> namespace searchcorespi::index { struct IThreadingService; } @@ -43,7 +43,7 @@ namespace bucketdb { class IBucketDBHandler; } * Class handling all aspects of feeding for a document database. * In addition to regular feeding this also includes handling the transaction log. */ -class FeedHandler: private search::transactionlog::TransLogClient::Session::Callback, +class FeedHandler: private search::transactionlog::client::Callback, public IDocumentMoveHandler, public IPruneRemovedDocumentsHandler, public IHeartBeatHandler, @@ -52,7 +52,7 @@ class FeedHandler: private search::transactionlog::TransLogClient::Session::Call { private: using Packet = search::transactionlog::Packet; - using RPC = search::transactionlog::RPC; + using RPC = search::transactionlog::client::RPC; using SerialNum = search::SerialNum; using Timestamp = storage::spi::Timestamp; using BucketId = document::BucketId; diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp index d01c25d9c1e..5214e13fe79 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp @@ -16,7 +16,7 @@ LOG_SETUP(".proton.server.feedstates"); using search::transactionlog::Packet; -using search::transactionlog::RPC; +using search::transactionlog::client::RPC; using search::SerialNum; using vespalib::Executor; using vespalib::makeClosure; diff --git a/searchcore/src/vespa/searchcore/proton/server/packetwrapper.h b/searchcore/src/vespa/searchcore/proton/server/packetwrapper.h index 6224b3b693a..c36652ec847 100644 --- a/searchcore/src/vespa/searchcore/proton/server/packetwrapper.h +++ b/searchcore/src/vespa/searchcore/proton/server/packetwrapper.h @@ -4,6 +4,7 @@ #include "tls_replay_progress.h" #include <vespa/searchlib/transactionlog/common.h> +#include <vespa/searchlib/transactionlog/client_common.h> #include <vespa/vespalib/util/gate.h> namespace proton { @@ -16,14 +17,14 @@ struct PacketWrapper { const search::transactionlog::Packet &packet; TlsReplayProgress *progress; - search::transactionlog::RPC::Result result; + search::transactionlog::client::RPC::Result result; vespalib::Gate gate; PacketWrapper(const search::transactionlog::Packet &p, TlsReplayProgress *progress_) : packet(p), progress(progress_), - result(search::transactionlog::RPC::ERROR), + result(search::transactionlog::client::RPC::ERROR), gate() { } diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp index 31fd44eec5e..23289296ada 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp @@ -2,15 +2,13 @@ #include "proton_disk_layout.h" #include <vespa/vespalib/io/fileutil.h> -#include <vespa/fastos/file.h> #include <vespa/searchcore/proton/common/doctypename.h> #include <vespa/searchlib/transactionlog/translogclient.h> -#include <cassert> #include <vespa/log/log.h> LOG_SETUP(".proton.server.proton_disk_layout"); -using search::transactionlog::TransLogClient; +using search::transactionlog::client::TransLogClient; namespace proton { diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp index 3ad98cba3ac..fdc9b6d7807 100644 --- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp @@ -2,6 +2,7 @@ #include "configstore.h" #include "transactionlogmanager.h" +#include <vespa/searchlib/transactionlog/translogclient.h> #include <vespa/searchcore/proton/common/eventlogger.h> #include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/util/exceptions.h> @@ -11,11 +12,11 @@ LOG_SETUP(".proton.server.transactionlogmanager"); using vespalib::IllegalStateException; using vespalib::make_string; -using search::transactionlog::TransLogClient; +using search::transactionlog::client::TransLogClient; +using search::transactionlog::client::Session; namespace proton { - void TransactionLogManager::doLogReplayComplete(const vespalib::string &domainName, vespalib::duration elapsedTime) const @@ -45,10 +46,8 @@ TransactionLogManager::init(SerialNum oldestConfigSerial, SerialNum &prunedSeria namespace { -void getStatus(TransLogClient::Session & session, - search::SerialNum & serialBegin, - search::SerialNum & serialEnd, - size_t & count) +void +getStatus(Session & session, search::SerialNum & serialBegin, search::SerialNum & serialEnd, size_t & count) { if (!session.status(serialBegin, serialEnd, count)) { throw IllegalStateException( @@ -66,7 +65,7 @@ void getStatus(TransLogClient & client, search::SerialNum & serialEnd, size_t & count) { - TransLogClient::Session::UP session = client.open(domainName); + std::unique_ptr<Session> session = client.open(domainName); if ( ! session) { throw IllegalStateException( make_string( @@ -117,7 +116,7 @@ TransactionLogManager::prepareReplay(TransLogClient &client, TlsReplayProgress::UP TransactionLogManager::startReplay(SerialNum first, SerialNum syncToken, - TransLogClient::Session::Callback &callback) + Callback &callback) { assert( !_visitor); _visitor = createTlcVisitor(callback); @@ -142,7 +141,7 @@ TransactionLogManager::startReplay(SerialNum first, getDomainName().c_str(), first, syncToken, getRpcTarget().c_str())); } - return TlsReplayProgress::UP(new TlsReplayProgress(getDomainName(), first, syncToken)); + return std::make_unique<TlsReplayProgress>(getDomainName(), first, syncToken); } diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h index 58444351e8f..32532e3f656 100644 --- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h +++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h @@ -13,7 +13,7 @@ struct ConfigStore; **/ class TransactionLogManager : public TransactionLogManagerBase { - TransLogClient::Visitor::UP _visitor; + std::unique_ptr<Visitor> _visitor; void doLogReplayComplete(const vespalib::string &domainName, vespalib::duration elapsedTime) const override; @@ -51,7 +51,7 @@ public: /** * Start replay of the transaction log. **/ - TlsReplayProgress::UP startReplay(SerialNum first, SerialNum syncToken, TransLogClient::Session::Callback &callback); + TlsReplayProgress::UP startReplay(SerialNum first, SerialNum syncToken, Callback &callback); /** * Indicate that replay is done. diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp index 8b18a7ae566..a8ecb2ba07b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp @@ -1,19 +1,20 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "transactionlogmanagerbase.h" +#include <vespa/searchlib/transactionlog/translogclient.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/log/log.h> LOG_SETUP(".proton.server.transactionlogmanagerbase"); -using search::transactionlog::TransLogClient; +using search::transactionlog::client::Visitor; namespace proton { TransactionLogManagerBase::TransactionLogManagerBase( const vespalib::string &tlsSpec, const vespalib::string &domainName) : - _tlc(tlsSpec), + _tlc(std::make_unique<TransLogClient>(tlsSpec)), _tlcSession(), _domainName(domainName), _replayLock(), @@ -29,31 +30,31 @@ TransactionLogManagerBase::~TransactionLogManagerBase() = default; TransactionLogManagerBase::StatusResult TransactionLogManagerBase::init() { - TransLogClient::Session::UP session = _tlc.open(_domainName); + std::unique_ptr<Session> session = _tlc->open(_domainName); if ( ! session) { - if (!_tlc.create(_domainName)) { + if (!_tlc->create(_domainName)) { vespalib::string str = vespalib::make_string( "Failed creating domain '%s' on TLS '%s'", - _domainName.c_str(), _tlc.getRPCTarget().c_str()); + _domainName.c_str(), _tlc->getRPCTarget().c_str()); throw std::runtime_error(str); } LOG(debug, "Created domain '%s' on TLS '%s'", - _domainName.c_str(), _tlc.getRPCTarget().c_str()); - session = _tlc.open(_domainName); + _domainName.c_str(), _tlc->getRPCTarget().c_str()); + session = _tlc->open(_domainName); if ( ! session) { vespalib::string str = vespalib::make_string( "Could not open session for domain '%s' on TLS '%s'", - _domainName.c_str(), _tlc.getRPCTarget().c_str()); + _domainName.c_str(), _tlc->getRPCTarget().c_str()); throw std::runtime_error(str); } } LOG(debug, "Opened domain '%s' on TLS '%s'", - _domainName.c_str(), _tlc.getRPCTarget().c_str()); + _domainName.c_str(), _tlc->getRPCTarget().c_str()); StatusResult res; if (!session->status(res.serialBegin, res.serialEnd, res.count)) { vespalib::string str = vespalib::make_string( "Could not get status from session with domain '%s' on TLS '%s'", - _domainName.c_str(), _tlc.getRPCTarget().c_str()); + _domainName.c_str(), _tlc->getRPCTarget().c_str()); throw std::runtime_error(str); } LOG(debug, @@ -72,7 +73,8 @@ TransactionLogManagerBase::internalStartReplay() _replayStopWatch = vespalib::Timer(); } -void TransactionLogManagerBase::changeReplayDone() +void +TransactionLogManagerBase::changeReplayDone() { std::lock_guard<std::mutex> guard(_replayLock); _replayDone = true; @@ -101,23 +103,31 @@ TransactionLogManagerBase::close() } } -TransLogClient::Visitor::UP -TransactionLogManagerBase::createTlcVisitor(TransLogClient::Session::Callback &callback) { - return _tlc.createVisitor(_domainName, callback); +std::unique_ptr<Visitor> +TransactionLogManagerBase::createTlcVisitor(Callback &callback) { + return _tlc->createVisitor(_domainName, callback); } -bool TransactionLogManagerBase::getReplayDone() const { +bool +TransactionLogManagerBase::getReplayDone() const { std::lock_guard<std::mutex> guard(_replayLock); return _replayDone; } -bool TransactionLogManagerBase::isDoingReplay() const { +bool +TransactionLogManagerBase::isDoingReplay() const { std::lock_guard<std::mutex> guard(_replayLock); return _replayStarted && !_replayDone; } -void TransactionLogManagerBase::logReplayComplete() const { +void +TransactionLogManagerBase::logReplayComplete() const { doLogReplayComplete(_domainName, _replayStopWatch.elapsed()); } +const vespalib::string & +TransactionLogManagerBase::getRpcTarget() const { + return _tlc->getRPCTarget(); +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h index 4b5d001a28e..7059604dfe7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h +++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h @@ -2,11 +2,17 @@ #pragma once -#include <vespa/searchlib/transactionlog/translogclient.h> +#include <vespa/searchlib/common/serialnum.h> #include <vespa/vespalib/util/time.h> #include <mutex> #include <condition_variable> +namespace search::transactionlog::client { + class TransLogClient; + class Session; + class Visitor; + class Callback; +} namespace proton { /** @@ -14,10 +20,13 @@ namespace proton { **/ class TransactionLogManagerBase { protected: - using TransLogClient = search::transactionlog::TransLogClient; + using TransLogClient = search::transactionlog::client::TransLogClient; + using Session = search::transactionlog::client::Session; + using Visitor = search::transactionlog::client::Visitor; + using Callback = search::transactionlog::client::Callback; private: - TransLogClient _tlc; - TransLogClient::Session::UP _tlcSession; + std::unique_ptr<TransLogClient> _tlc; + std::unique_ptr<Session> _tlcSession; vespalib::string _domainName; mutable std::mutex _replayLock; mutable std::condition_variable _replayCond; @@ -26,7 +35,7 @@ private: vespalib::Timer _replayStopWatch; protected: - typedef search::SerialNum SerialNum; + using SerialNum = search::SerialNum; struct StatusResult { SerialNum serialBegin; @@ -55,17 +64,17 @@ public: void changeReplayDone(); void close(); - TransLogClient::Visitor::UP createTlcVisitor(TransLogClient::Session::Callback &callback); + std::unique_ptr<Visitor> createTlcVisitor(Callback &callback); void waitForReplayDone() const; - TransLogClient &getClient() { return _tlc; } - TransLogClient::Session *getSession() { return _tlcSession.get(); } + TransLogClient &getClient() { return *_tlc; } + Session *getSession() { return _tlcSession.get(); } const vespalib::string &getDomainName() const { return _domainName; } bool getReplayDone() const; bool isDoingReplay() const; void logReplayComplete() const; - const vespalib::string &getRpcTarget() const { return _tlc.getRPCTarget(); } + const vespalib::string &getRpcTarget() const; }; } // namespace proton |