diff options
6 files changed, 111 insertions, 87 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index b3b51e1de90..fc9518ccf1b 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -322,12 +322,12 @@ bool Domain::erase(SerialNum to) } int Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to, - FRT_Supervisor & supervisor, FNET_Connection *conn) + std::unique_ptr<Session::Destination> dest) { assert(this == domain.get()); cleanSessions(); SerialNumRange range(from, to); - auto session = std::make_shared<Session>(_sessionId++, range, domain, supervisor, conn); + auto session = std::make_shared<Session>(_sessionId++, range, domain, std::move(dest)); int id = session->id(); LockGuard guard(_sessionLock); _sessions[id] = std::move(session); diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index c1ff9157a6f..c0ee484926c 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -51,7 +51,7 @@ public: bool erase(SerialNum to); void commit(const Packet & packet); - int visit(const Domain::SP & self, SerialNum from, SerialNum to, FRT_Supervisor & supervisor, FNET_Connection *conn); + int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr<Session::Destination> dest); SerialNum begin() const; SerialNum end() const; diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp index 1aacb4f842c..ef36ff980d8 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "session.h" #include "domain.h" -#include <vespa/fnet/frt/supervisor.h> #include <vespa/fastlib/io/bufferedfile.h> #include <vespa/log/log.h> @@ -11,14 +10,10 @@ using vespalib::LockGuard; namespace search::transactionlog { -namespace { - const double NEVER(-1.0); -} - vespalib::Executor::Task::UP Session::createTask(const Session::SP & session) { - return Task::UP(new VisitTask(session)); + return std::make_unique<VisitTask>(session); } Session::VisitTask::VisitTask(const Session::SP & session) @@ -86,7 +81,7 @@ Session::visitOnly() } bool Session::finished() const { - return _finished || (_connection->GetState() != FNET_Connection::FNET_CONNECTED); + return _finished || ! _destination->connected(); } void @@ -99,32 +94,9 @@ Session::finalize() _finished = true; } -int32_t -Session::rpc(FRT_RPCRequest * req) -{ - int32_t retval(-7); - LOG(debug, "rpc %s starting.", req->GetMethodName()); - FRT_Supervisor::InvokeSync(_supervisor.GetTransport(), _connection, req, NEVER); - if (req->GetErrorCode() == FRTE_NO_ERROR) { - retval = (req->GetReturn()->GetValue(0)._intval32); - LOG(debug, "rpc %s = %d\n", req->GetMethodName(), retval); - } else if (req->GetErrorCode() == FRTE_RPC_TIMEOUT) { - LOG(warning, "rpc %s timed out. Will allow to continue: error(%d): %s\n", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage()); - retval = -req->GetErrorCode(); - } else { - if (req->GetErrorCode() != FRTE_RPC_CONNECTION) { - LOG(warning, "rpc %s: error(%d): %s\n", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage()); - } - retval = -req->GetErrorCode(); - _ok = false; - } - return retval; -} - Session::Session(int sId, const SerialNumRange & r, const Domain::SP & d, - FRT_Supervisor & supervisor, FNET_Connection *conn) : - _supervisor(supervisor), - _connection(conn), + std::unique_ptr<Destination> destination) : + _destination(std::move(destination)), _domain(d), _range(r), _id(sId), @@ -134,45 +106,20 @@ Session::Session(int sId, const SerialNumRange & r, const Domain::SP & d, _finished(false), _startTime() { - _connection->AddRef(); } -Session::~Session() -{ - _connection->SubRef(); -} +Session::~Session() = default; bool Session::send(const Packet & packet) { - FRT_RPCRequest *req = _supervisor.AllocRPCRequest(); - req->SetMethodName("visitCallback"); - req->GetParams()->AddString(_domain->name().c_str()); - req->GetParams()->AddInt32(id()); - req->GetParams()->AddData(packet.getHandle().c_str(), packet.getHandle().size()); - return send(req); -} - -bool -Session::send(FRT_RPCRequest * req) -{ - int32_t retval = rpc(req); - if ( ! ((retval == RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) { - LOG(error, "Return value != OK(%d) in send for method 'visitCallback'.", retval); - } - req->SubRef(); - - return (retval == RPC::OK); + return _destination->send(_id, _domain->name(), packet); } bool Session::sendDone() { - FRT_RPCRequest *req = _supervisor.AllocRPCRequest(); - req->SetMethodName("eofCallback"); - req->GetParams()->AddString(_domain->name().c_str()); - req->GetParams()->AddInt32(id()); - bool retval(send(req)); + bool retval = _destination->sendDone(_id, _domain->name()); _inSync = true; return retval; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.h b/searchlib/src/vespa/searchlib/transactionlog/session.h index 6cd31ee4d42..dde9c03d3b8 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.h +++ b/searchlib/src/vespa/searchlib/transactionlog/session.h @@ -4,9 +4,9 @@ #include "common.h" #include <vespa/vespalib/util/executor.h> #include <vespa/vespalib/util/sync.h> -#include <vespa/fnet/frt/invoker.h> #include <chrono> #include <deque> +#include <atomic> class FastOS_FileInterface; @@ -23,10 +23,17 @@ private: using time_point = std::chrono::time_point<std::chrono::steady_clock>; public: + class Destination { + public: + virtual ~Destination() {} + virtual bool send(int32_t id, const vespalib::string & domain, const Packet & packet) = 0; + virtual bool sendDone(int32_t id, const vespalib::string & domain) = 0; + virtual bool connected() const = 0; + }; typedef std::shared_ptr<Session> SP; Session(const Session &) = delete; Session & operator = (const Session &) = delete; - Session(int sId, const SerialNumRange & r, const DomainSP & d, FRT_Supervisor & supervisor, FNET_Connection *conn); + Session(int sId, const SerialNumRange & r, const DomainSP & d, std::unique_ptr<Destination> destination); ~Session(); const SerialNumRange & range() const { return _range; } int id() const { return _id; } @@ -47,7 +54,6 @@ private: Session::SP _session; }; - bool send(FRT_RPCRequest * req); bool send(const Packet & packet); bool sendDone(); void visit(); @@ -55,17 +61,15 @@ private: void startVisit(); void finalize(); bool visit(FastOS_FileInterface & file, DomainPart & dp) __attribute__((noinline)); - int32_t rpc(FRT_RPCRequest * req); - FRT_Supervisor & _supervisor; - FNET_Connection * _connection; - DomainSP _domain; - SerialNumRange _range; - int _id; - bool _ok; - std::atomic<bool> _visitRunning; - std::atomic<bool> _inSync; - std::atomic<bool> _finished; - time_point _startTime; + std::unique_ptr<Destination> _destination; + DomainSP _domain; + SerialNumRange _range; + int _id; + bool _ok; + std::atomic<bool> _visitRunning; + std::atomic<bool> _inSync; + std::atomic<bool> _finished; + time_point _startTime; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index dd6b63f9241..c344ae7bbaa 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -4,6 +4,8 @@ #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/fnet/task.h> #include <fstream> #include <vespa/log/log.h> @@ -26,21 +28,16 @@ class SyncHandler : public FNET_Task SerialNum _syncTo; public: - SyncHandler(FRT_Supervisor *supervisor, - FRT_RPCRequest *req,const Domain::SP &domain, - const TransLogServer::Session::SP &session, - SerialNum syncTo); + SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req,const Domain::SP &domain, + const TransLogServer::Session::SP &session, SerialNum syncTo); ~SyncHandler(); void PerformTask() override; }; -SyncHandler::SyncHandler(FRT_Supervisor *supervisor, - FRT_RPCRequest *req, - const Domain::SP &domain, - const TransLogServer::Session::SP &session, - SerialNum syncTo) +SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const Domain::SP &domain, + const TransLogServer::Session::SP &session, SerialNum syncTo) : FNET_Task(supervisor->GetScheduler()), _req(*req), _domain(domain), @@ -325,6 +322,8 @@ void TransLogServer::exportRPC(FRT_Supervisor & supervisor) namespace { +constexpr double NEVER(-1.0); + void writeDomainDir(std::lock_guard<std::mutex> &guard, vespalib::string dir, @@ -344,6 +343,77 @@ writeDomainDir(std::lock_guard<std::mutex> &guard, vespalib::File::sync(dir); } +class RPCDestination : public Session::Destination { +public: + RPCDestination(FRT_Supervisor & supervisor, FNET_Connection * connection) + : _supervisor(supervisor), _connection(connection), _ok(true) + { + _connection->AddRef(); + } + ~RPCDestination() override { _connection->SubRef(); } + + bool + send(int32_t id, const vespalib::string & domain, const Packet & packet) override + { + FRT_RPCRequest *req = _supervisor.AllocRPCRequest(); + req->SetMethodName("visitCallback"); + req->GetParams()->AddString(domain.c_str()); + req->GetParams()->AddInt32(id); + req->GetParams()->AddData(packet.getHandle().c_str(), packet.getHandle().size()); + return send(req); + } + + bool + sendDone(int32_t id, const vespalib::string & domain) override + { + FRT_RPCRequest *req = _supervisor.AllocRPCRequest(); + req->SetMethodName("eofCallback"); + req->GetParams()->AddString(domain.c_str()); + req->GetParams()->AddInt32(id); + bool retval(send(req)); + return retval; + } + bool connected() const override { + return (_connection->GetState() != FNET_Connection::FNET_CONNECTED); + } +private: + bool + send(FRT_RPCRequest * req) + { + int32_t retval = rpc(req); + if ( ! ((retval == RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) { + LOG(error, "Return value != OK(%d) in send for method 'visitCallback'.", retval); + } + req->SubRef(); + + return (retval == RPC::OK); + } + int32_t + rpc(FRT_RPCRequest * req) + { + int32_t retval(-7); + LOG(debug, "rpc %s starting.", req->GetMethodName()); + FRT_Supervisor::InvokeSync(_supervisor.GetTransport(), _connection, req, NEVER); + if (req->GetErrorCode() == FRTE_NO_ERROR) { + retval = (req->GetReturn()->GetValue(0)._intval32); + LOG(debug, "rpc %s = %d\n", req->GetMethodName(), retval); + } else if (req->GetErrorCode() == FRTE_RPC_TIMEOUT) { + LOG(warning, "rpc %s timed out. Will allow to continue: error(%d): %s\n", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage()); + retval = -req->GetErrorCode(); + } else { + if (req->GetErrorCode() != FRTE_RPC_CONNECTION) { + LOG(warning, "rpc %s: error(%d): %s\n", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage()); + } + retval = -req->GetErrorCode(); + _ok = false; + } + return retval; + } + FRT_Supervisor & _supervisor; + FNET_Connection * _connection; + bool _ok; +}; + } void TransLogServer::createDomain(FRT_RPCRequest *req) @@ -508,7 +578,7 @@ void TransLogServer::domainVisit(FRT_RPCRequest *req) SerialNum from(params[1]._intval64); SerialNum to(params[2]._intval64); LOG(debug, "domainVisit(%s, %" PRIu64 ", %" PRIu64 ")", domainName, from, to); - retval = domain->visit(domain, from, to, *_supervisor, req->GetConnection()); + retval = domain->visit(domain, from, to, std::make_unique<RPCDestination>(*_supervisor, req->GetConnection())); } ret.AddInt32(retval); } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index 189be8c38d8..8aedfef6d8d 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -8,6 +8,9 @@ #include <vespa/fnet/frt/invokable.h> #include <mutex> + +class FRT_Supervisor; + namespace search::common { class FileHeaderContext; } namespace search::transactionlog { |