diff options
21 files changed, 506 insertions, 414 deletions
diff --git a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp index 2f34292ad52..a7c53293981 100644 --- a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp +++ b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp @@ -286,7 +286,7 @@ public: * and dispatches each packet entry to the ReplayPacketDispatcher that * transforms them into concrete operations. */ -class VisitorCallback : public TransLogClient::Session::Callback +class VisitorCallback : public client::Callback { private: ReplayPacketDispatcher _dispatcher; @@ -298,7 +298,7 @@ public: _eof(false) { } - virtual RPC::Result receive(const Packet &packet) override { + client::RPC::Result receive(const Packet &packet) override { vespalib::nbostream_longlivedbuf handle(packet.getHandle().data(), packet.getHandle().size()); try { while (handle.size() > 0) { @@ -309,11 +309,11 @@ public: } catch (const std::exception &e) { std::cerr << "Error while handling transaction log packet: '" << std::string(e.what()) << "'" << std::endl; - return RPC::ERROR; + return client::RPC::ERROR; } - return RPC::OK; + return client::RPC::OK; } - virtual void eof() override { _eof = true; } + void eof() override { _eof = true; } bool isEof() const { return _eof; } }; @@ -371,7 +371,7 @@ protected: const BaseOptions &_bopts; DummyFileHeaderContext _fileHeader; TransLogServer _server; - TransLogClient _client; + client::TransLogClient _client; public: BaseUtility(const BaseOptions &bopts) @@ -416,7 +416,7 @@ public: _client.listDomains(domains); std::cout << "Listing status for " << domains.size() << " domain(s):" << std::endl; for (size_t i = 0; i < domains.size(); ++i) { - TransLogClient::Session::UP session = _client.open(domains[i]); + std::unique_ptr<client::Session> session = _client.open(domains[i]); SerialNum first; SerialNum last; size_t count; @@ -484,7 +484,7 @@ protected: DocTypeRepo repo(_oopts.configDir); IReplayPacketHandlerUP handler = createHandler(repo.docTypeRepo); VisitorCallback callback(*handler); - TransLogClient::Visitor::UP visitor = _client.createVisitor(_oopts.domainName, callback); + std::unique_ptr<client::Visitor> visitor = _client.createVisitor(_oopts.domainName, callback); bool visitOk = visitor->visit(_oopts.firstSerialNum-1, _oopts.lastSerialNum); if (!visitOk) { std::cerr << "Visiting domain '" << _oopts.domainName << "' [" << _oopts.firstSerialNum << "," diff --git a/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp b/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp index edb4250ce76..2df7c5d629d 100644 --- a/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp +++ b/searchcore/src/tests/proton/proton_disk_layout/proton_disk_layout_test.cpp @@ -11,7 +11,7 @@ #include <vespa/vespalib/util/stringfmt.h> using search::index::DummyFileHeaderContext; -using search::transactionlog::TransLogClient; +using search::transactionlog::client::TransLogClient; using search::transactionlog::TransLogServer; using proton::DocTypeName; using proton::ProtonDiskLayout; diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index a1afd7a0bb8..8a117f112b4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -19,6 +19,7 @@ #include <vespa/searchcore/proton/common/eventlogger.h> #include <vespa/searchcorespi/index/ithreadingservice.h> #include <vespa/searchlib/common/gatecallback.h> +#include <vespa/searchlib/transactionlog/client_session.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/lambdatask.h> #include <unistd.h> @@ -331,7 +332,7 @@ FeedHandler::FeedHandler(IThreadingService &writeService, IReplayConfig &replayConfig, search::transactionlog::Writer & tlsDirectWriter, TlsWriter * tlsWriter) - : search::transactionlog::TransLogClient::Session::Callback(), + : search::transactionlog::client::Callback(), IDocumentMoveHandler(), IPruneRemovedDocumentsHandler(), IHeartBeatHandler(), diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index b7fc2733f49..d70cefb288b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -13,7 +13,7 @@ #include <vespa/document/bucket/bucketid.h> #include <vespa/searchcore/proton/common/doctypename.h> #include <vespa/searchcore/proton/common/feedtoken.h> -#include <vespa/searchlib/transactionlog/translogclient.h> +#include <vespa/searchlib/transactionlog/client_common.h> #include <shared_mutex> namespace searchcorespi::index { struct IThreadingService; } @@ -43,7 +43,7 @@ namespace bucketdb { class IBucketDBHandler; } * Class handling all aspects of feeding for a document database. * In addition to regular feeding this also includes handling the transaction log. */ -class FeedHandler: private search::transactionlog::TransLogClient::Session::Callback, +class FeedHandler: private search::transactionlog::client::Callback, public IDocumentMoveHandler, public IPruneRemovedDocumentsHandler, public IHeartBeatHandler, @@ -52,7 +52,7 @@ class FeedHandler: private search::transactionlog::TransLogClient::Session::Call { private: using Packet = search::transactionlog::Packet; - using RPC = search::transactionlog::RPC; + using RPC = search::transactionlog::client::RPC; using SerialNum = search::SerialNum; using Timestamp = storage::spi::Timestamp; using BucketId = document::BucketId; diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp index d01c25d9c1e..5214e13fe79 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedstates.cpp @@ -16,7 +16,7 @@ LOG_SETUP(".proton.server.feedstates"); using search::transactionlog::Packet; -using search::transactionlog::RPC; +using search::transactionlog::client::RPC; using search::SerialNum; using vespalib::Executor; using vespalib::makeClosure; diff --git a/searchcore/src/vespa/searchcore/proton/server/packetwrapper.h b/searchcore/src/vespa/searchcore/proton/server/packetwrapper.h index 6224b3b693a..c36652ec847 100644 --- a/searchcore/src/vespa/searchcore/proton/server/packetwrapper.h +++ b/searchcore/src/vespa/searchcore/proton/server/packetwrapper.h @@ -4,6 +4,7 @@ #include "tls_replay_progress.h" #include <vespa/searchlib/transactionlog/common.h> +#include <vespa/searchlib/transactionlog/client_common.h> #include <vespa/vespalib/util/gate.h> namespace proton { @@ -16,14 +17,14 @@ struct PacketWrapper { const search::transactionlog::Packet &packet; TlsReplayProgress *progress; - search::transactionlog::RPC::Result result; + search::transactionlog::client::RPC::Result result; vespalib::Gate gate; PacketWrapper(const search::transactionlog::Packet &p, TlsReplayProgress *progress_) : packet(p), progress(progress_), - result(search::transactionlog::RPC::ERROR), + result(search::transactionlog::client::RPC::ERROR), gate() { } diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp index 31fd44eec5e..23289296ada 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton_disk_layout.cpp @@ -2,15 +2,13 @@ #include "proton_disk_layout.h" #include <vespa/vespalib/io/fileutil.h> -#include <vespa/fastos/file.h> #include <vespa/searchcore/proton/common/doctypename.h> #include <vespa/searchlib/transactionlog/translogclient.h> -#include <cassert> #include <vespa/log/log.h> LOG_SETUP(".proton.server.proton_disk_layout"); -using search::transactionlog::TransLogClient; +using search::transactionlog::client::TransLogClient; namespace proton { diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp index 3ad98cba3ac..fdc9b6d7807 100644 --- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.cpp @@ -2,6 +2,7 @@ #include "configstore.h" #include "transactionlogmanager.h" +#include <vespa/searchlib/transactionlog/translogclient.h> #include <vespa/searchcore/proton/common/eventlogger.h> #include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/util/exceptions.h> @@ -11,11 +12,11 @@ LOG_SETUP(".proton.server.transactionlogmanager"); using vespalib::IllegalStateException; using vespalib::make_string; -using search::transactionlog::TransLogClient; +using search::transactionlog::client::TransLogClient; +using search::transactionlog::client::Session; namespace proton { - void TransactionLogManager::doLogReplayComplete(const vespalib::string &domainName, vespalib::duration elapsedTime) const @@ -45,10 +46,8 @@ TransactionLogManager::init(SerialNum oldestConfigSerial, SerialNum &prunedSeria namespace { -void getStatus(TransLogClient::Session & session, - search::SerialNum & serialBegin, - search::SerialNum & serialEnd, - size_t & count) +void +getStatus(Session & session, search::SerialNum & serialBegin, search::SerialNum & serialEnd, size_t & count) { if (!session.status(serialBegin, serialEnd, count)) { throw IllegalStateException( @@ -66,7 +65,7 @@ void getStatus(TransLogClient & client, search::SerialNum & serialEnd, size_t & count) { - TransLogClient::Session::UP session = client.open(domainName); + std::unique_ptr<Session> session = client.open(domainName); if ( ! session) { throw IllegalStateException( make_string( @@ -117,7 +116,7 @@ TransactionLogManager::prepareReplay(TransLogClient &client, TlsReplayProgress::UP TransactionLogManager::startReplay(SerialNum first, SerialNum syncToken, - TransLogClient::Session::Callback &callback) + Callback &callback) { assert( !_visitor); _visitor = createTlcVisitor(callback); @@ -142,7 +141,7 @@ TransactionLogManager::startReplay(SerialNum first, getDomainName().c_str(), first, syncToken, getRpcTarget().c_str())); } - return TlsReplayProgress::UP(new TlsReplayProgress(getDomainName(), first, syncToken)); + return std::make_unique<TlsReplayProgress>(getDomainName(), first, syncToken); } diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h index 58444351e8f..32532e3f656 100644 --- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h +++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanager.h @@ -13,7 +13,7 @@ struct ConfigStore; **/ class TransactionLogManager : public TransactionLogManagerBase { - TransLogClient::Visitor::UP _visitor; + std::unique_ptr<Visitor> _visitor; void doLogReplayComplete(const vespalib::string &domainName, vespalib::duration elapsedTime) const override; @@ -51,7 +51,7 @@ public: /** * Start replay of the transaction log. **/ - TlsReplayProgress::UP startReplay(SerialNum first, SerialNum syncToken, TransLogClient::Session::Callback &callback); + TlsReplayProgress::UP startReplay(SerialNum first, SerialNum syncToken, Callback &callback); /** * Indicate that replay is done. diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp index 8b18a7ae566..a8ecb2ba07b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp @@ -1,19 +1,20 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "transactionlogmanagerbase.h" +#include <vespa/searchlib/transactionlog/translogclient.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/log/log.h> LOG_SETUP(".proton.server.transactionlogmanagerbase"); -using search::transactionlog::TransLogClient; +using search::transactionlog::client::Visitor; namespace proton { TransactionLogManagerBase::TransactionLogManagerBase( const vespalib::string &tlsSpec, const vespalib::string &domainName) : - _tlc(tlsSpec), + _tlc(std::make_unique<TransLogClient>(tlsSpec)), _tlcSession(), _domainName(domainName), _replayLock(), @@ -29,31 +30,31 @@ TransactionLogManagerBase::~TransactionLogManagerBase() = default; TransactionLogManagerBase::StatusResult TransactionLogManagerBase::init() { - TransLogClient::Session::UP session = _tlc.open(_domainName); + std::unique_ptr<Session> session = _tlc->open(_domainName); if ( ! session) { - if (!_tlc.create(_domainName)) { + if (!_tlc->create(_domainName)) { vespalib::string str = vespalib::make_string( "Failed creating domain '%s' on TLS '%s'", - _domainName.c_str(), _tlc.getRPCTarget().c_str()); + _domainName.c_str(), _tlc->getRPCTarget().c_str()); throw std::runtime_error(str); } LOG(debug, "Created domain '%s' on TLS '%s'", - _domainName.c_str(), _tlc.getRPCTarget().c_str()); - session = _tlc.open(_domainName); + _domainName.c_str(), _tlc->getRPCTarget().c_str()); + session = _tlc->open(_domainName); if ( ! session) { vespalib::string str = vespalib::make_string( "Could not open session for domain '%s' on TLS '%s'", - _domainName.c_str(), _tlc.getRPCTarget().c_str()); + _domainName.c_str(), _tlc->getRPCTarget().c_str()); throw std::runtime_error(str); } } LOG(debug, "Opened domain '%s' on TLS '%s'", - _domainName.c_str(), _tlc.getRPCTarget().c_str()); + _domainName.c_str(), _tlc->getRPCTarget().c_str()); StatusResult res; if (!session->status(res.serialBegin, res.serialEnd, res.count)) { vespalib::string str = vespalib::make_string( "Could not get status from session with domain '%s' on TLS '%s'", - _domainName.c_str(), _tlc.getRPCTarget().c_str()); + _domainName.c_str(), _tlc->getRPCTarget().c_str()); throw std::runtime_error(str); } LOG(debug, @@ -72,7 +73,8 @@ TransactionLogManagerBase::internalStartReplay() _replayStopWatch = vespalib::Timer(); } -void TransactionLogManagerBase::changeReplayDone() +void +TransactionLogManagerBase::changeReplayDone() { std::lock_guard<std::mutex> guard(_replayLock); _replayDone = true; @@ -101,23 +103,31 @@ TransactionLogManagerBase::close() } } -TransLogClient::Visitor::UP -TransactionLogManagerBase::createTlcVisitor(TransLogClient::Session::Callback &callback) { - return _tlc.createVisitor(_domainName, callback); +std::unique_ptr<Visitor> +TransactionLogManagerBase::createTlcVisitor(Callback &callback) { + return _tlc->createVisitor(_domainName, callback); } -bool TransactionLogManagerBase::getReplayDone() const { +bool +TransactionLogManagerBase::getReplayDone() const { std::lock_guard<std::mutex> guard(_replayLock); return _replayDone; } -bool TransactionLogManagerBase::isDoingReplay() const { +bool +TransactionLogManagerBase::isDoingReplay() const { std::lock_guard<std::mutex> guard(_replayLock); return _replayStarted && !_replayDone; } -void TransactionLogManagerBase::logReplayComplete() const { +void +TransactionLogManagerBase::logReplayComplete() const { doLogReplayComplete(_domainName, _replayStopWatch.elapsed()); } +const vespalib::string & +TransactionLogManagerBase::getRpcTarget() const { + return _tlc->getRPCTarget(); +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h index 4b5d001a28e..7059604dfe7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h +++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h @@ -2,11 +2,17 @@ #pragma once -#include <vespa/searchlib/transactionlog/translogclient.h> +#include <vespa/searchlib/common/serialnum.h> #include <vespa/vespalib/util/time.h> #include <mutex> #include <condition_variable> +namespace search::transactionlog::client { + class TransLogClient; + class Session; + class Visitor; + class Callback; +} namespace proton { /** @@ -14,10 +20,13 @@ namespace proton { **/ class TransactionLogManagerBase { protected: - using TransLogClient = search::transactionlog::TransLogClient; + using TransLogClient = search::transactionlog::client::TransLogClient; + using Session = search::transactionlog::client::Session; + using Visitor = search::transactionlog::client::Visitor; + using Callback = search::transactionlog::client::Callback; private: - TransLogClient _tlc; - TransLogClient::Session::UP _tlcSession; + std::unique_ptr<TransLogClient> _tlc; + std::unique_ptr<Session> _tlcSession; vespalib::string _domainName; mutable std::mutex _replayLock; mutable std::condition_variable _replayCond; @@ -26,7 +35,7 @@ private: vespalib::Timer _replayStopWatch; protected: - typedef search::SerialNum SerialNum; + using SerialNum = search::SerialNum; struct StatusResult { SerialNum serialBegin; @@ -55,17 +64,17 @@ public: void changeReplayDone(); void close(); - TransLogClient::Visitor::UP createTlcVisitor(TransLogClient::Session::Callback &callback); + std::unique_ptr<Visitor> createTlcVisitor(Callback &callback); void waitForReplayDone() const; - TransLogClient &getClient() { return _tlc; } - TransLogClient::Session *getSession() { return _tlcSession.get(); } + TransLogClient &getClient() { return *_tlc; } + Session *getSession() { return _tlcSession.get(); } const vespalib::string &getDomainName() const { return _domainName; } bool getReplayDone() const; bool isDoingReplay() const; void logReplayComplete() const; - const vespalib::string &getRpcTarget() const { return _tlc.getRPCTarget(); } + const vespalib::string &getRpcTarget() const; }; } // namespace proton diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index a20e0cc3aaa..e6cf44cb697 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -4,6 +4,7 @@ #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/objects/identifiable.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> +#include <vespa/document/util/bytebuffer.h> #include <vespa/fastos/file.h> #include <vespa/log/log.h> @@ -15,17 +16,24 @@ using namespace document; using namespace vespalib; using namespace std::chrono_literals; using search::index::DummyFileHeaderContext; +using search::transactionlog::client::TransLogClient; +using search::transactionlog::client::Session; +using search::transactionlog::client::Visitor; +using search::transactionlog::client::RPC; +using search::transactionlog::client::Callback; +using SessionUP = std::unique_ptr<Session>; +using VisitorUP = std::unique_ptr<Visitor>; namespace { bool createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains=0); -TransLogClient::Session::UP openDomainTest(TransLogClient & tls, const vespalib::string & name); -bool fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name); -void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries); -void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize); +SessionUP openDomainTest(TransLogClient & tls, const vespalib::string & name); +bool fillDomainTest(Session * s1, const vespalib::string & name); +void fillDomainTest(Session * s1, size_t numPackets, size_t numEntries); +void fillDomainTest(Session * s1, size_t numPackets, size_t numEntries, size_t entrySize); uint32_t countFiles(const vespalib::string &dir); -void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries); -bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name); +void checkFilledDomainTest(const SessionUP &s1, size_t numEntries); +bool visitDomainTest(TransLogClient & tls, Session * s1, const vespalib::string & name); void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains); void verifyDomain(const vespalib::string & name); @@ -43,7 +51,7 @@ myhex(const void * b, size_t sz) return s; } -class CallBackTest : public TransLogClient::Visitor::Callback +class CallBackTest : public Callback { private: virtual RPC::Result receive(const Packet & packet) override; @@ -74,7 +82,7 @@ CallBackTest::receive(const Packet & p) return RPC::OK; } -class CallBackManyTest : public TransLogClient::Visitor::Callback +class CallBackManyTest : public Callback { private: virtual RPC::Result receive(const Packet & packet) override; @@ -103,7 +111,7 @@ CallBackManyTest::receive(const Packet & p) return RPC::OK; } -class CallBackUpdate : public TransLogClient::Visitor::Callback +class CallBackUpdate : public Callback { public: typedef std::map<SerialNum, Identifiable *> PacketMap; @@ -153,7 +161,7 @@ CallBackUpdate::receive(const Packet & packet) return RPC::OK; } -class CallBackStatsTest : public TransLogClient::Session::Callback +class CallBackStatsTest : public Callback { private: virtual RPC::Result receive(const Packet & packet) override; @@ -219,8 +227,8 @@ createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t pre std::vector<vespalib::string> dir; tls.listDomains(dir); EXPECT_EQUAL (dir.size(), preExistingDomains); - TransLogClient::Session::UP s1 = tls.open(name); - ASSERT_TRUE (s1.get() == NULL); + SessionUP s1 = tls.open(name); + ASSERT_FALSE (s1); retval = tls.create(name); ASSERT_TRUE (retval); dir.clear(); @@ -230,16 +238,16 @@ createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t pre return retval; } -TransLogClient::Session::UP +SessionUP openDomainTest(TransLogClient & tls, const vespalib::string & name) { - TransLogClient::Session::UP s1 = tls.open(name); - ASSERT_TRUE (s1.get() != NULL); + SessionUP s1 = tls.open(name); + ASSERT_TRUE (s1); return s1; } bool -fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name) +fillDomainTest(Session * s1, const vespalib::string & name) { bool retval(true); Packet::Entry e1(1, 1, vespalib::ConstBufferRef("Content in buffer A", 20)); @@ -279,7 +287,7 @@ fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name) } void -fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries) +fillDomainTest(Session * s1, size_t numPackets, size_t numEntries) { size_t value(0); for(size_t i=0; i < numPackets; i++) { @@ -333,7 +341,7 @@ fillDomainTest(TransLogServer & s1, const vespalib::string & domain, size_t numP void -fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize) +fillDomainTest(Session * s1, size_t numPackets, size_t numEntries, size_t entrySize) { size_t value(0); std::vector<char> entryBuffer(entrySize); @@ -368,7 +376,7 @@ countFiles(const vespalib::string &dir) } void -checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries) +checkFilledDomainTest(const SessionUP &s1, size_t numEntries) { SerialNum b(0), e(0); size_t c(0); @@ -379,7 +387,7 @@ checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries) } bool -visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name) +visitDomainTest(TransLogClient & tls, Session * s1, const vespalib::string & name) { bool retval(true); @@ -391,7 +399,7 @@ visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespal EXPECT_EQUAL(c, 3u); CallBackTest ca; - TransLogClient::Visitor::UP visitor = tls.createVisitor(name, ca); + VisitorUP visitor = tls.createVisitor(name, ca); ASSERT_TRUE(visitor.get()); EXPECT_TRUE( visitor->visit(0, 1) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -451,7 +459,7 @@ void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_ TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, name, preExistingDomains); - TransLogClient::Session::UP s1 = openDomainTest(tls, name); + SessionUP s1 = openDomainTest(tls, name); fillDomainTest(s1.get(), name); } @@ -459,7 +467,7 @@ void verifyDomain(const vespalib::string & name) { DummyFileHeaderContext fileHeaderContext; TransLogServer tlss("test13", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); - TransLogClient::Session::UP s1 = openDomainTest(tls, name); + SessionUP s1 = openDomainTest(tls, name); visitDomainTest(tls, s1.get(), name); } @@ -472,7 +480,7 @@ TEST("testVisitOverGeneratedDomain") { vespalib::string name("test1"); createDomainTest(tls, name); - TransLogClient::Session::UP s1 = openDomainTest(tls, name); + SessionUP s1 = openDomainTest(tls, name); fillDomainTest(s1.get(), name); EXPECT_EQUAL(0, getMaxSessionRunTime(tlss, "test1")); visitDomainTest(tls, s1.get(), name); @@ -488,7 +496,7 @@ TEST("testVisitOverPreExistingDomain") { TransLogClient tls("tcp/localhost:18377"); vespalib::string name("test1"); - TransLogClient::Session::UP s1 = openDomainTest(tls, name); + SessionUP s1 = openDomainTest(tls, name); visitDomainTest(tls, s1.get(), name); } @@ -497,8 +505,8 @@ TEST("partialUpdateTest") { TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); - TransLogClient::Session::UP s1 = openDomainTest(tls, "test1"); - TransLogClient::Session & session = *s1; + SessionUP s1 = openDomainTest(tls, "test1"); + Session & session = *s1; TestIdentifiable du; @@ -513,7 +521,7 @@ TEST("partialUpdateTest") { ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().data(), pa.getHandle().size()))); CallBackUpdate ca; - TransLogClient::Visitor::UP visitor = tls.createVisitor("test1", ca); + VisitorUP visitor = tls.createVisitor("test1", ca); ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(5, 7) ); for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -522,7 +530,7 @@ TEST("partialUpdateTest") { ASSERT_TRUE( ca.hasSerial(7) ); CallBackUpdate ca1; - TransLogClient::Visitor::UP visitor1 = tls.createVisitor("test1", ca1); + VisitorUP visitor1 = tls.createVisitor("test1", ca1); ASSERT_TRUE(visitor1.get()); ASSERT_TRUE( visitor1->visit(4, 5) ); for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -530,7 +538,7 @@ TEST("partialUpdateTest") { ASSERT_TRUE( ca1.map().size() == 0); CallBackUpdate ca2; - TransLogClient::Visitor::UP visitor2 = tls.createVisitor("test1", ca2); + VisitorUP visitor2 = tls.createVisitor("test1", ca2); ASSERT_TRUE(visitor2.get()); ASSERT_TRUE( visitor2->visit(5, 6) ); for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -538,7 +546,7 @@ TEST("partialUpdateTest") { ASSERT_TRUE( ca2.map().size() == 0); CallBackUpdate ca3; - TransLogClient::Visitor::UP visitor3 = tls.createVisitor("test1", ca3); + VisitorUP visitor3 = tls.createVisitor("test1", ca3); ASSERT_TRUE(visitor3.get()); ASSERT_TRUE( visitor3->visit(5, 1000) ); for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -562,7 +570,7 @@ TEST("testRemove") { vespalib::string name("test-delete"); createDomainTest(tls, name); - TransLogClient::Session::UP s1 = openDomainTest(tls, name); + SessionUP s1 = openDomainTest(tls, name); fillDomainTest(s1.get(), name); visitDomainTest(tls, s1.get(), name); ASSERT_TRUE(tls.remove(name)); @@ -577,7 +585,7 @@ assertVisitStats(TransLogClient &tls, const vespalib::string &domain, uint64_t expCount, uint64_t expInOrder) { CallBackStatsTest ca; - TransLogClient::Visitor::UP visitor = tls.createVisitor(domain, ca); + VisitorUP visitor = tls.createVisitor(domain, ca); ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(visitStart, visitEnd) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { @@ -591,9 +599,7 @@ assertVisitStats(TransLogClient &tls, const vespalib::string &domain, } void -assertStatus(TransLogClient::Session &s, - SerialNum expFirstSerial, SerialNum expLastSerial, - uint64_t expCount) +assertStatus(Session &s, SerialNum expFirstSerial, SerialNum expLastSerial, uint64_t expCount) { SerialNum b(0), e(0); size_t c(0); @@ -618,7 +624,7 @@ TEST("test sending a lot of data") { TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, MANY, 0); - TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); + SessionUP s1 = openDomainTest(tls, MANY); fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES); SerialNum b(0), e(0); size_t c(0); @@ -627,7 +633,7 @@ TEST("test sending a lot of data") { EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor("many", ca); + VisitorUP visitor = tls.createVisitor("many", ca); ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -640,7 +646,7 @@ TEST("test sending a lot of data") { TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); - TransLogClient::Session::UP s1 = openDomainTest(tls, "many"); + SessionUP s1 = openDomainTest(tls, "many"); SerialNum b(0), e(0); size_t c(0); EXPECT_TRUE(s1->status(b, e, c)); @@ -648,7 +654,7 @@ TEST("test sending a lot of data") { EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); + VisitorUP visitor = tls.createVisitor(MANY, ca); ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -661,7 +667,7 @@ TEST("test sending a lot of data") { TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); - TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); + SessionUP s1 = openDomainTest(tls, MANY); SerialNum b(0), e(0); size_t c(0); EXPECT_TRUE(s1->status(b, e, c)); @@ -669,7 +675,7 @@ TEST("test sending a lot of data") { EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); + VisitorUP visitor = tls.createVisitor(MANY, ca); ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -690,7 +696,7 @@ TEST("test sending a lot of data async") { .setChunkAgeLimit(10ms)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, MANY, 1); - TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); + SessionUP s1 = openDomainTest(tls, MANY); fillDomainTest(tlss, MANY, NUM_PACKETS, NUM_ENTRIES); SerialNum b(0), e(0); size_t c(0); @@ -699,7 +705,7 @@ TEST("test sending a lot of data async") { EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); + VisitorUP visitor = tls.createVisitor(MANY, ca); ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -712,7 +718,7 @@ TEST("test sending a lot of data async") { TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); - TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); + SessionUP s1 = openDomainTest(tls, MANY); SerialNum b(0), e(0); size_t c(0); EXPECT_TRUE(s1->status(b, e, c)); @@ -720,7 +726,7 @@ TEST("test sending a lot of data async") { EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); + VisitorUP visitor = tls.createVisitor(MANY, ca); ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -743,7 +749,7 @@ TEST("testErase") { TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "erase", 0); - TransLogClient::Session::UP s1 = openDomainTest(tls, "erase"); + SessionUP s1 = openDomainTest(tls, "erase"); fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES); } { @@ -751,7 +757,7 @@ TEST("testErase") { TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); - TransLogClient::Session::UP s1 = openDomainTest(tls, "erase"); + SessionUP s1 = openDomainTest(tls, "erase"); // Before erase TEST_DO(assertVisitStats(tls, "erase", 2, TOTAL_NUM_ENTRIES, @@ -839,7 +845,7 @@ TEST("testSync") { TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "sync", 0); - TransLogClient::Session::UP s1 = openDomainTest(tls, "sync"); + SessionUP s1 = openDomainTest(tls, "sync"); fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES); SerialNum syncedTo(0); @@ -861,7 +867,7 @@ TEST("test truncate on version mismatch") { TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "sync", 0); - TransLogClient::Session::UP s1 = openDomainTest(tls, "sync"); + SessionUP s1 = openDomainTest(tls, "sync"); fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES); EXPECT_TRUE(s1->status(fromOld, toOld, countOld)); SerialNum syncedTo(0); @@ -880,7 +886,7 @@ TEST("test truncate on version mismatch") { { TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); - TransLogClient::Session::UP s1 = openDomainTest(tls, "sync"); + SessionUP s1 = openDomainTest(tls, "sync"); uint64_t from(0), to(0); size_t count(0); EXPECT_TRUE(s1->status(from, to, count)); @@ -906,7 +912,7 @@ TEST("test truncation after short read") { TransLogClient tls(tlsspec); createDomainTest(tls, domain, 0); - TransLogClient::Session::UP s1 = openDomainTest(tls, domain); + SessionUP s1 = openDomainTest(tls, domain); fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES, ENTRYSIZE); SerialNum syncedTo(0); @@ -920,7 +926,7 @@ TEST("test truncation after short read") { { TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls(tlsspec); - TransLogClient::Session::UP s1 = openDomainTest(tls, domain); + SessionUP s1 = openDomainTest(tls, domain); checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES); } { @@ -929,14 +935,14 @@ TEST("test truncation after short read") { { vespalib::string filename(dir + "/truncate-0000000000000017"); FastOS_File trfile(filename.c_str()); - EXPECT_TRUE(trfile.OpenReadWrite(NULL)); + EXPECT_TRUE(trfile.OpenReadWrite(nullptr)); trfile.SetSize(trfile.getSize() - 1); trfile.Close(); } { TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls(tlsspec); - TransLogClient::Session::UP s1 = openDomainTest(tls, domain); + SessionUP s1 = openDomainTest(tls, domain); checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES - 1); } { diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp index 925f297bf48..a516fb26604 100644 --- a/searchlib/src/tests/transactionlogstress/translogstress.cpp +++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp @@ -27,8 +27,10 @@ using search::index::DummyFileHeaderContext; namespace search::transactionlog { -using ClientSession = TransLogClient::Session; -using Visitor = TransLogClient::Visitor; +using ClientSession = client::Session; +using client::Visitor; +using client::TransLogClient; +using client::RPC; //----------------------------------------------------------------------------- // BufferGenerator @@ -287,7 +289,7 @@ FeederThread::doRun() //----------------------------------------------------------------------------- // Agent //----------------------------------------------------------------------------- -class Agent : public ClientSession::Callback +class Agent : public client::Callback { protected: std::string _tlsSpec; @@ -301,12 +303,13 @@ protected: public: Agent(const std::string & tlsSpec, const std::string & domain, const EntryGenerator & generator, const std::string & name, uint32_t id, bool validate) : - ClientSession::Callback(), + client::Callback(), _tlsSpec(tlsSpec), _domain(domain), _client(tlsSpec), - _generator(generator), _name(name), _id(id), _validate(validate) {} - virtual ~Agent() {} - virtual RPC::Result receive(const Packet & packet) override = 0; - virtual void eof() override {} + _generator(generator), _name(name), _id(id), _validate(validate) + {} + ~Agent() override {} + RPC::Result receive(const Packet & packet) override = 0; + void eof() override {} virtual void failed() {} }; diff --git a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt index 5dca84a26c1..6ce7e652326 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/transactionlog/CMakeLists.txt @@ -2,6 +2,7 @@ vespa_add_library(searchlib_transactionlog OBJECT SOURCES chunks.cpp + client_session.cpp common.cpp domain.cpp domainconfig.cpp diff --git a/searchlib/src/vespa/searchlib/transactionlog/client_common.h b/searchlib/src/vespa/searchlib/transactionlog/client_common.h new file mode 100644 index 00000000000..05bb30ff368 --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/client_common.h @@ -0,0 +1,20 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +namespace search::transactionlog { class Packet; } +namespace search::transactionlog::client { + +class RPC +{ +public: +enum Result { OK, FULL, ERROR }; +}; + +class Callback { +public: + virtual ~Callback() = default; + virtual RPC::Result receive(const Packet & packet) = 0; + virtual void eof() { } +}; + +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp b/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp new file mode 100644 index 00000000000..8678d88b43c --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/client_session.cpp @@ -0,0 +1,200 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "client_session.h" +#include "translogclient.h" +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/fnet/frt/supervisor.h> +#include <thread> + +#include <vespa/log/log.h> +LOG_SETUP(".translog.client_session"); + +using vespalib::LockGuard; +using namespace std::chrono_literals; + +namespace search::transactionlog::client { + +SessionKey::SessionKey(const vespalib::string & domain, int sessionId) + : _domain(domain), + _sessionId(sessionId) +{ } +SessionKey::~SessionKey() = default; + +int +SessionKey::cmp(const SessionKey & b) const +{ + int diff(strcmp(_domain.c_str(), b._domain.c_str())); + if (diff == 0) { + diff = _sessionId - b._sessionId; + } + return diff; +} + +Session::Session(const vespalib::string & domain, TransLogClient & tlc) + : _tlc(tlc), + _domain(domain), + _sessionId(0) +{ +} + +Session::~Session() +{ + close(); + clear(); +} + +bool +Session::commit(const vespalib::ConstBufferRef & buf) +{ + bool retval(true); + if (buf.size() != 0) { + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); + req->SetMethodName("domainCommit"); + req->GetParams()->AddString(_domain.c_str()); + req->GetParams()->AddData(buf.c_str(), buf.size()); + int retcode = _tlc.rpc(req); + retval = (retcode == 0); + if (retval) { + req->SubRef(); + } else { + vespalib::string msg; + if (req->GetReturn() != nullptr) { + msg = req->GetReturn()->GetValue(1)._string._str; + } else { + msg = vespalib::make_string("Clientside error %s: error(%d): %s", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage()); + } + req->SubRef(); + throw std::runtime_error(vespalib::make_string("commit failed with code %d. server says: %s", retcode, msg.c_str())); + } + } + return retval; +} + +bool +Session::status(SerialNum & b, SerialNum & e, size_t & count) +{ + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); + req->SetMethodName("domainStatus"); + req->GetParams()->AddString(_domain.c_str()); + int32_t retval(_tlc.rpc(req)); + if (retval == 0) { + b = req->GetReturn()->GetValue(1)._intval64; + e = req->GetReturn()->GetValue(2)._intval64; + count = req->GetReturn()->GetValue(3)._intval64; + } + req->SubRef(); + return (retval == 0); +} + +bool +Session::erase(const SerialNum & to) +{ + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); + req->SetMethodName("domainPrune"); + req->GetParams()->AddString(_domain.c_str()); + req->GetParams()->AddInt64(to); + int32_t retval(_tlc.rpc(req)); + req->SubRef(); + if (retval == 1) { + LOG(warning, "Prune to %" PRIu64 " denied since there were active visitors in that area", to); + } + return (retval == 0); +} + + +bool +Session::sync(const SerialNum &syncTo, SerialNum &syncedTo) +{ + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); + req->SetMethodName("domainSync"); + FRT_Values & params = *req->GetParams(); + params.AddString(_domain.c_str()); + params.AddInt64(syncTo); + int32_t retval(_tlc.rpc(req)); + if (retval == 0) { + syncedTo = req->GetReturn()->GetValue(1)._intval64; + } + req->SubRef(); + return (retval == 0); +} + + +void +Session::clear() +{ + if (_sessionId > 0) { + LockGuard guard(_tlc._lock); + _tlc._sessions.erase(SessionKey(_domain, _sessionId)); + } + _sessionId = 0; +} + +Visitor::Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack) : + Session(domain, tlc), + _callback(callBack) +{ +} + +bool +Session::init(FRT_RPCRequest *req) +{ + int32_t retval(_tlc.rpc(req)); + req->SubRef(); + if (retval > 0) { + clear(); + _sessionId = retval; + SessionKey key(_domain, _sessionId); + { + LockGuard guard(_tlc._lock); + _tlc._sessions[key] = this; + } + retval = run(); + } + return (retval > 0); +} + +bool +Visitor::visit(const SerialNum & from, const SerialNum & to) +{ + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); + req->SetMethodName("domainVisit"); + req->GetParams()->AddString(_domain.c_str()); + req->GetParams()->AddInt64(from); + req->GetParams()->AddInt64(to); + return init(req); +} + +bool +Session::run() +{ + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); + req->SetMethodName("domainSessionRun"); + req->GetParams()->AddString(_domain.c_str()); + req->GetParams()->AddInt32(_sessionId); + int32_t retval(_tlc.rpc(req)); + req->SubRef(); + return (retval == 0); +} + +bool +Session::close() +{ + int retval(0); + if (_sessionId > 0) { + do { + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); + req->SetMethodName("domainSessionClose"); + req->GetParams()->AddString(_domain.c_str()); + req->GetParams()->AddInt32(_sessionId); + if ( (retval = _tlc.rpc(req)) > 0) { + std::this_thread::sleep_for(10ms); + } + req->SubRef(); + } while ( retval == 1 ); + } + return (retval == 0); +} + +Visitor::~Visitor() = default; + +} diff --git a/searchlib/src/vespa/searchlib/transactionlog/client_session.h b/searchlib/src/vespa/searchlib/transactionlog/client_session.h new file mode 100644 index 00000000000..49f24d83aaf --- /dev/null +++ b/searchlib/src/vespa/searchlib/transactionlog/client_session.h @@ -0,0 +1,68 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "client_common.h" +#include <vespa/searchlib/common/serialnum.h> +#include <vespa/vespalib/util/buffer.h> +#include <vespa/vespalib/stllike/string.h> + +class FRT_RPCRequest; + +namespace search::transactionlog::client { + +class TransLogClient; + +class SessionKey +{ +public: + SessionKey(const vespalib::string & domain, int sessionId); + ~SessionKey(); + bool operator < (const SessionKey & b) const { return cmp(b) < 0; } +private: + int cmp(const SessionKey & b) const; + vespalib::string _domain; + int _sessionId; +}; + +class Session +{ +public: + Session(const vespalib::string & domain, TransLogClient & tlc); + virtual ~Session(); + /// You can commit data of any registered type to any channel. + bool commit(const vespalib::ConstBufferRef & packet); + /// Will erase all entries prior to <to> + bool erase(const SerialNum & to); + bool status(SerialNum & b, SerialNum & e, size_t & count); + + bool sync(const SerialNum &syncTo, SerialNum &syncedTo); + + virtual RPC::Result visit(const Packet & ) { return RPC::OK; } + virtual void eof() { } + bool close(); + void clear(); + const vespalib::string & getDomain() const { return _domain; } + const TransLogClient & getTLC() const { return _tlc; } +protected: + bool init(FRT_RPCRequest * req); + bool run(); + TransLogClient & _tlc; + vespalib::string _domain; + int _sessionId; +}; + +/// Here you connect to the incomming data getting everything from <from> +class Visitor : public Session +{ +public: + Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack); + bool visit(const SerialNum & from, const SerialNum & to); + virtual ~Visitor() override; + RPC::Result visit(const Packet & packet) override { return _callback.receive(packet); } + void eof() override { _callback.eof(); } +private: + Callback & _callback; +}; + +} + diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h index c5427c5b401..8dba5d448f8 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.h +++ b/searchlib/src/vespa/searchlib/transactionlog/common.h @@ -10,14 +10,7 @@ namespace search::transactionlog { /// This represents a type of the entry. Fx update,remove -typedef uint32_t Type; -/// A channel represents one data stream. - -class RPC -{ -public: -enum Result { OK, FULL, ERROR }; -}; +using Type = uint32_t; class SerialNumRange { diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp index 2c6c1e249f4..84919a59a97 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp @@ -1,12 +1,14 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "translogclient.h" +#include "common.h" #include <vespa/vespalib/util/stringfmt.h> #include <vespa/fnet/frt/supervisor.h> #include <vespa/fnet/frt/target.h> #include <vespa/fnet/frt/rpcrequest.h> #include <vespa/fnet/transport.h> +#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/fastos/thread.h> -#include <thread> + #include <vespa/log/log.h> LOG_SETUP(".translogclient"); @@ -15,7 +17,7 @@ using namespace std::chrono_literals; VESPA_THREAD_STACK_TAG(translogclient_rpc_callback) -namespace search::transactionlog { +namespace search::transactionlog::client { namespace { const double NEVER(-1.0); @@ -33,7 +35,7 @@ struct RpcTask : public vespalib::Executor::Task { req->Return(); req = nullptr; } - ~RpcTask() { + ~RpcTask() override { if (req != nullptr) { req->SetError(FRTE_RPC_METHOD_FAILED, "client has been shut down"); req->Return(); @@ -46,13 +48,13 @@ struct RpcTask : public vespalib::Executor::Task { using vespalib::LockGuard; TransLogClient::TransLogClient(const vespalib::string & rpcTarget) : - _executor(1, 128 * 1024, translogclient_rpc_callback), + _executor(std::make_unique<vespalib::ThreadStackExecutor>(1, 128 * 1024, translogclient_rpc_callback)), _rpcTarget(rpcTarget), _sessions(), _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)), _transport(std::make_unique<FNET_Transport>()), _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())), - _target(NULL) + _target(nullptr) { reconnect(); exportRPC(*_supervisor); @@ -62,29 +64,33 @@ TransLogClient::TransLogClient(const vespalib::string & rpcTarget) : TransLogClient::~TransLogClient() { disconnect(); - _executor.shutdown().sync(); + _executor->shutdown().sync(); _transport->ShutDown(true); } -bool TransLogClient::reconnect() +bool +TransLogClient::reconnect() { disconnect(); _target = _supervisor->Get2WayTarget(_rpcTarget.c_str()); return isConnected(); } -bool TransLogClient::isConnected() const { - return (_target != NULL) && _target->IsValid(); +bool +TransLogClient::isConnected() const { + return (_target != nullptr) && _target->IsValid(); } -void TransLogClient::disconnect() +void +TransLogClient::disconnect() { if (_target) { _target->SubRef(); } } -bool TransLogClient::create(const vespalib::string & domain) +bool +TransLogClient::create(const vespalib::string & domain) { FRT_RPCRequest *req = _supervisor->AllocRPCRequest(); req->SetMethodName("createDomain"); @@ -94,7 +100,8 @@ bool TransLogClient::create(const vespalib::string & domain) return (retval == 0); } -bool TransLogClient::remove(const vespalib::string & domain) +bool +TransLogClient::remove(const vespalib::string & domain) { FRT_RPCRequest *req = _supervisor->AllocRPCRequest(); req->SetMethodName("deleteDomain"); @@ -104,27 +111,28 @@ bool TransLogClient::remove(const vespalib::string & domain) return (retval == 0); } -TransLogClient::Session::UP TransLogClient::open(const vespalib::string & domain) +std::unique_ptr<Session> +TransLogClient::open(const vespalib::string & domain) { - Session::UP session; FRT_RPCRequest *req = _supervisor->AllocRPCRequest(); req->SetMethodName("openDomain"); req->GetParams()->AddString(domain.c_str()); int32_t retval(rpc(req)); + req->SubRef(); if (retval == 0) { - session.reset(new Session(domain, *this)); + return std::make_unique<Session>(domain, *this); } - req->SubRef(); - return session; + return std::unique_ptr<Session>(); } -TransLogClient::Visitor::UP -TransLogClient::createVisitor(const vespalib::string & domain, TransLogClient::Session::Callback & callBack) +std::unique_ptr<Visitor> +TransLogClient::createVisitor(const vespalib::string & domain, Callback & callBack) { - return TransLogClient::Visitor::UP(new Visitor(domain, *this, callBack)); + return std::make_unique<Visitor>(domain, *this, callBack); } -bool TransLogClient::listDomains(std::vector<vespalib::string> & dir) +bool +TransLogClient::listDomains(std::vector<vespalib::string> & dir) { FRT_RPCRequest *req = _supervisor->AllocRPCRequest(); req->SetMethodName("listDomains"); @@ -139,7 +147,8 @@ bool TransLogClient::listDomains(std::vector<vespalib::string> & dir) return (retval == 0); } -int32_t TransLogClient::rpc(FRT_RPCRequest * req) +int32_t +TransLogClient::rpc(FRT_RPCRequest * req) { int32_t retval(-7); if (_target) { @@ -156,15 +165,17 @@ int32_t TransLogClient::rpc(FRT_RPCRequest * req) return retval; } -TransLogClient::Session * TransLogClient::findSession(const vespalib::string & domainName, int sessionId) +Session * +TransLogClient::findSession(const vespalib::string & domainName, int sessionId) { SessionKey key(domainName, sessionId); SessionMap::iterator found(_sessions.find(key)); - Session * session((found != _sessions.end()) ? found->second : NULL); + Session * session((found != _sessions.end()) ? found->second : nullptr); return session; } -void TransLogClient::exportRPC(FRT_Supervisor & supervisor) +void +TransLogClient::exportRPC(FRT_Supervisor & supervisor) { FRT_ReflectionBuilder rb( & supervisor); @@ -185,7 +196,8 @@ void TransLogClient::exportRPC(FRT_Supervisor & supervisor) } -void TransLogClient::do_visitCallbackRPC(FRT_RPCRequest *req) +void +TransLogClient::do_visitCallbackRPC(FRT_RPCRequest *req) { uint32_t retval(uint32_t(-1)); FRT_Values & params = *req->GetParams(); @@ -194,7 +206,7 @@ void TransLogClient::do_visitCallbackRPC(FRT_RPCRequest *req) int32_t sessionId(params[1]._intval32); LOG(spam, "visitCallback(%s, %d)(%d)", domainName, sessionId, params[2]._data._len); Session * session(findSession(domainName, sessionId)); - if (session != NULL) { + if (session != nullptr) { Packet packet(params[2]._data._buf, params[2]._data._len); retval = session->visit(packet); } @@ -202,7 +214,8 @@ void TransLogClient::do_visitCallbackRPC(FRT_RPCRequest *req) LOG(debug, "visitCallback(%s, %d)=%d done", domainName, sessionId, retval); } -void TransLogClient::do_eofCallbackRPC(FRT_RPCRequest *req) +void +TransLogClient::do_eofCallbackRPC(FRT_RPCRequest *req) { uint32_t retval(uint32_t(-1)); FRT_Values & params = *req->GetParams(); @@ -211,7 +224,7 @@ void TransLogClient::do_eofCallbackRPC(FRT_RPCRequest *req) int32_t sessionId(params[1]._intval32); LOG(debug, "eofCallback(%s, %d)", domainName, sessionId); Session * session(findSession(domainName, sessionId)); - if (session != NULL) { + if (session != nullptr) { session->eof(); retval = 0; } @@ -219,183 +232,16 @@ void TransLogClient::do_eofCallbackRPC(FRT_RPCRequest *req) LOG(debug, "eofCallback(%s, %d)=%d done", domainName, sessionId, retval); } -void TransLogClient::visitCallbackRPC_hook(FRT_RPCRequest *req) -{ - _executor.execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_visitCallbackRPC(x); })); -} - -void TransLogClient::eofCallbackRPC_hook(FRT_RPCRequest *req) -{ - _executor.execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_eofCallbackRPC(x); })); -} - - -TransLogClient::Session::Session(const vespalib::string & domain, TransLogClient & tlc) : - _tlc(tlc), - _domain(domain), - _sessionId(0) -{ -} - -TransLogClient::Session::~Session() -{ - close(); - clear(); -} - -bool TransLogClient::Session::commit(const vespalib::ConstBufferRef & buf) -{ - bool retval(true); - if (buf.size() != 0) { - FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); - req->SetMethodName("domainCommit"); - req->GetParams()->AddString(_domain.c_str()); - req->GetParams()->AddData(buf.c_str(), buf.size()); - int retcode = _tlc.rpc(req); - retval = (retcode == 0); - if (retval) { - req->SubRef(); - } else { - vespalib::string msg; - if (req->GetReturn() != 0) { - msg = req->GetReturn()->GetValue(1)._string._str; - } else { - msg = vespalib::make_string("Clientside error %s: error(%d): %s", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage()); - } - req->SubRef(); - throw std::runtime_error(vespalib::make_string("commit failed with code %d. server says: %s", retcode, msg.c_str())); - } - } - return retval; -} - -bool TransLogClient::Session::status(SerialNum & b, SerialNum & e, size_t & count) -{ - FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); - req->SetMethodName("domainStatus"); - req->GetParams()->AddString(_domain.c_str()); - int32_t retval(_tlc.rpc(req)); - if (retval == 0) { - b = req->GetReturn()->GetValue(1)._intval64; - e = req->GetReturn()->GetValue(2)._intval64; - count = req->GetReturn()->GetValue(3)._intval64; - } - req->SubRef(); - return (retval == 0); -} - -bool TransLogClient::Session::erase(const SerialNum & to) -{ - FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); - req->SetMethodName("domainPrune"); - req->GetParams()->AddString(_domain.c_str()); - req->GetParams()->AddInt64(to); - int32_t retval(_tlc.rpc(req)); - req->SubRef(); - if (retval == 1) { - LOG(warning, "Prune to %" PRIu64 " denied since there were active visitors in that area", to); - } - return (retval == 0); -} - - -bool -TransLogClient::Session::sync(const SerialNum &syncTo, SerialNum &syncedTo) -{ - FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); - req->SetMethodName("domainSync"); - FRT_Values & params = *req->GetParams(); - params.AddString(_domain.c_str()); - params.AddInt64(syncTo); - int32_t retval(_tlc.rpc(req)); - if (retval == 0) { - syncedTo = req->GetReturn()->GetValue(1)._intval64; - } - req->SubRef(); - return (retval == 0); -} - - -void TransLogClient::Session::clear() -{ - if (_sessionId > 0) { - LockGuard guard(_tlc._lock); - _tlc._sessions.erase(SessionKey(_domain, _sessionId)); - } - _sessionId = 0; -} - -int TransLogClient::SessionKey::cmp(const TransLogClient::SessionKey & b) const +void +TransLogClient::visitCallbackRPC_hook(FRT_RPCRequest *req) { - int diff(strcmp(_domain.c_str(), b._domain.c_str())); - if (diff == 0) { - diff = _sessionId - b._sessionId; - } - return diff; + _executor->execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_visitCallbackRPC(x); })); } -TransLogClient::Visitor::Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack) : - Session(domain, tlc), - _callback(callBack) +void +TransLogClient::eofCallbackRPC_hook(FRT_RPCRequest *req) { + _executor->execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_eofCallbackRPC(x); })); } -bool TransLogClient::Session::init(FRT_RPCRequest *req) -{ - int32_t retval(_tlc.rpc(req)); - req->SubRef(); - if (retval > 0) { - clear(); - _sessionId = retval; - SessionKey key(_domain, _sessionId); - { - LockGuard guard(_tlc._lock); - _tlc._sessions[key] = this; - } - retval = run(); - } - return (retval > 0); -} - -bool TransLogClient::Visitor::visit(const SerialNum & from, const SerialNum & to) -{ - FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); - req->SetMethodName("domainVisit"); - req->GetParams()->AddString(_domain.c_str()); - req->GetParams()->AddInt64(from); - req->GetParams()->AddInt64(to); - return init(req); -} - -bool TransLogClient::Session::run() -{ - FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); - req->SetMethodName("domainSessionRun"); - req->GetParams()->AddString(_domain.c_str()); - req->GetParams()->AddInt32(_sessionId); - int32_t retval(_tlc.rpc(req)); - req->SubRef(); - return (retval == 0); -} - -bool TransLogClient::Session::close() -{ - int retval(0); - if (_sessionId > 0) { - do { - FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); - req->SetMethodName("domainSessionClose"); - req->GetParams()->AddString(_domain.c_str()); - req->GetParams()->AddInt32(_sessionId); - if ( (retval = _tlc.rpc(req)) > 0) { - std::this_thread::sleep_for(10ms); - } - req->SubRef(); - } while ( retval == 1 ); - } - return (retval == 0); -} - -TransLogClient::Visitor::~Visitor() = default; - } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h index 38c30cd5b4c..289a0fcb8c0 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h @@ -1,11 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "common.h" -#include <vespa/document/util/bytebuffer.h> +#include "client_common.h" +#include "client_session.h" #include <vespa/vespalib/util/sync.h> -#include <vespa/vespalib/util/buffer.h> -#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/fnet/frt/invokable.h> #include <map> #include <vector> @@ -13,90 +11,39 @@ class FNET_Transport; class FRT_Supervisor; class FRT_Target; +class FastOS_ThreadPool; -namespace search::transactionlog { +namespace vespalib { class ThreadStackExecutorBase; } +namespace search::transactionlog::client { + +class Session; +class Visitor; class TransLogClient : private FRT_Invokable { -private: - TransLogClient(const TransLogClient &); - TransLogClient& operator=(const TransLogClient &); - public: - class Session - { - public: - class Callback { - public: - virtual ~Callback() { } - virtual RPC::Result receive(const Packet & packet) = 0; - virtual void eof() { } - }; - public: - typedef std::unique_ptr<Session> UP; - typedef std::shared_ptr<Session> SP; - - Session(const vespalib::string & domain, TransLogClient & tlc); - virtual ~Session(); - /// You can commit data of any registered type to any channel. - bool commit(const vespalib::ConstBufferRef & packet); - /// Will erase all entries prior to <to> - bool erase(const SerialNum & to); - bool status(SerialNum & b, SerialNum & e, size_t & count); - - bool sync(const SerialNum &syncTo, SerialNum &syncedTo); - - virtual RPC::Result visit(const Packet & ) { return RPC::OK; } - virtual void eof() { } - bool close(); - void clear(); - const vespalib::string & getDomain() const { return _domain; } - const TransLogClient & getTLC() const { return _tlc; } - protected: - bool init(FRT_RPCRequest * req); - bool run(); - TransLogClient & _tlc; - vespalib::string _domain; - int _sessionId; - }; - /// Here you connect to the incomming data getting everything from <from> - class Visitor : public Session - { - public: - typedef std::unique_ptr<Visitor> UP; - typedef std::shared_ptr<Visitor> SP; - - Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack); - bool visit(const SerialNum & from, const SerialNum & to); - virtual ~Visitor(); - RPC::Result visit(const Packet & packet) override { return _callback.receive(packet); } - void eof() override { _callback.eof(); } - private: - Callback & _callback; - }; - /// Here you read the incomming data getting everything from <from> - -public: - typedef std::unique_ptr<TransLogClient> UP; - TransLogClient(const vespalib::string & rpctarget); - virtual ~TransLogClient(); + TransLogClient(const TransLogClient &) = delete; + TransLogClient& operator=(const TransLogClient &) = delete; + ~TransLogClient() override; /// Here you create a new domain bool create(const vespalib::string & domain); /// Here you remove a domain bool remove(const vespalib::string & domain); /// Here you open an existing domain - Session::UP open(const vespalib::string & domain); + std::unique_ptr<Session> open(const vespalib::string & domain); /// Here you can get a list of available domains. bool listDomains(std::vector<vespalib::string> & dir); - Visitor::UP createVisitor(const vespalib::string & domain, Session::Callback & callBack); + std::unique_ptr<Visitor> createVisitor(const vespalib::string & domain, Callback & callBack); bool isConnected() const; void disconnect(); bool reconnect(); const vespalib::string &getRPCTarget() const { return _rpcTarget; } private: + friend Session; + friend Visitor; void exportRPC(FRT_Supervisor & supervisor); void do_visitCallbackRPC(FRT_RPCRequest *req); void do_eofCallbackRPC(FRT_RPCRequest *req); @@ -105,22 +52,11 @@ private: int32_t rpc(FRT_RPCRequest * req); Session * findSession(const vespalib::string & domain, int sessionId); - class SessionKey - { - public: - SessionKey(const vespalib::string & domain, int sessionId) : _domain(domain), _sessionId(sessionId) { } - bool operator < (const SessionKey & b) const { return cmp(b) < 0; } - private: - int cmp(const SessionKey & b) const; - vespalib::string _domain; - int _sessionId; - }; - - typedef std::map< SessionKey, Session * > SessionMap; + using SessionMap = std::map< SessionKey, Session * >; - vespalib::ThreadStackExecutor _executor; - vespalib::string _rpcTarget; - SessionMap _sessions; + std::unique_ptr<vespalib::ThreadStackExecutorBase> _executor; + vespalib::string _rpcTarget; + SessionMap _sessions; //Brute force lock for subscriptions. For multithread safety. vespalib::Lock _lock; std::unique_ptr<FastOS_ThreadPool> _threadPool; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index a72731b661e..64e472520a5 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "translogserver.h" #include "domain.h" +#include "client_common.h" #include <vespa/searchlib/common/gatecallback.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/io/fileutil.h> @@ -399,12 +400,12 @@ public: private: bool send(FRT_RPCRequest * req) { int32_t retval = rpc(req); - if ( ! ((retval == RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) { + if ( ! ((retval == client::RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) { LOG(error, "Return value != OK(%d) in send for method 'visitCallback'.", retval); } req->SubRef(); - return (retval == RPC::OK); + return (retval == client::RPC::OK); } int32_t rpc(FRT_RPCRequest * req) { int32_t retval(-7); |