diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-09-07 15:53:46 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-07 15:53:46 +0200 |
commit | 0428eb0abc8743121d20745d0f73cde7b742d63d (patch) | |
tree | e1383077e6a0b1cdf0beed3846f10f3b20f095b3 /searchlib | |
parent | b52d9df0eb64a4c46d6246d5f74bbb886e857547 (diff) | |
parent | 1cc9b065b9160600dcef93c6f041d774aea5d958 (diff) |
Merge pull request #6854 from vespa-engine/balder/locate-all-rpc-stuff-in-one-place
Use an interface to avoid exposing rpc more than necessary.
Diffstat (limited to 'searchlib')
6 files changed, 139 insertions, 112 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index b3b51e1de90..fc9518ccf1b 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -322,12 +322,12 @@ bool Domain::erase(SerialNum to) } int Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to, - FRT_Supervisor & supervisor, FNET_Connection *conn) + std::unique_ptr<Session::Destination> dest) { assert(this == domain.get()); cleanSessions(); SerialNumRange range(from, to); - auto session = std::make_shared<Session>(_sessionId++, range, domain, supervisor, conn); + auto session = std::make_shared<Session>(_sessionId++, range, domain, std::move(dest)); int id = session->id(); LockGuard guard(_sessionLock); _sessions[id] = std::move(session); diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index c1ff9157a6f..c0ee484926c 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -51,7 +51,7 @@ public: bool erase(SerialNum to); void commit(const Packet & packet); - int visit(const Domain::SP & self, SerialNum from, SerialNum to, FRT_Supervisor & supervisor, FNET_Connection *conn); + int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr<Session::Destination> dest); SerialNum begin() const; SerialNum end() const; diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp index 1aacb4f842c..e703c32484f 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "session.h" #include "domain.h" -#include <vespa/fnet/frt/supervisor.h> #include <vespa/fastlib/io/bufferedfile.h> #include <vespa/log/log.h> @@ -11,14 +10,10 @@ using vespalib::LockGuard; namespace search::transactionlog { -namespace { - const double NEVER(-1.0); -} - vespalib::Executor::Task::UP Session::createTask(const Session::SP & session) { - return Task::UP(new VisitTask(session)); + return std::make_unique<VisitTask>(session); } Session::VisitTask::VisitTask(const Session::SP & session) @@ -86,7 +81,7 @@ Session::visitOnly() } bool Session::finished() const { - return _finished || (_connection->GetState() != FNET_Connection::FNET_CONNECTED); + return _finished || ! _destination->connected(); } void @@ -99,80 +94,31 @@ Session::finalize() _finished = true; } -int32_t -Session::rpc(FRT_RPCRequest * req) -{ - int32_t retval(-7); - LOG(debug, "rpc %s starting.", req->GetMethodName()); - FRT_Supervisor::InvokeSync(_supervisor.GetTransport(), _connection, req, NEVER); - if (req->GetErrorCode() == FRTE_NO_ERROR) { - retval = (req->GetReturn()->GetValue(0)._intval32); - LOG(debug, "rpc %s = %d\n", req->GetMethodName(), retval); - } else if (req->GetErrorCode() == FRTE_RPC_TIMEOUT) { - LOG(warning, "rpc %s timed out. Will allow to continue: error(%d): %s\n", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage()); - retval = -req->GetErrorCode(); - } else { - if (req->GetErrorCode() != FRTE_RPC_CONNECTION) { - LOG(warning, "rpc %s: error(%d): %s\n", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage()); - } - retval = -req->GetErrorCode(); - _ok = false; - } - return retval; -} - Session::Session(int sId, const SerialNumRange & r, const Domain::SP & d, - FRT_Supervisor & supervisor, FNET_Connection *conn) : - _supervisor(supervisor), - _connection(conn), + std::unique_ptr<Destination> destination) : + _destination(std::move(destination)), _domain(d), _range(r), _id(sId), - _ok(true), _visitRunning(false), _inSync(false), _finished(false), _startTime() { - _connection->AddRef(); } -Session::~Session() -{ - _connection->SubRef(); -} +Session::~Session() = default; bool Session::send(const Packet & packet) { - FRT_RPCRequest *req = _supervisor.AllocRPCRequest(); - req->SetMethodName("visitCallback"); - req->GetParams()->AddString(_domain->name().c_str()); - req->GetParams()->AddInt32(id()); - req->GetParams()->AddData(packet.getHandle().c_str(), packet.getHandle().size()); - return send(req); -} - -bool -Session::send(FRT_RPCRequest * req) -{ - int32_t retval = rpc(req); - if ( ! ((retval == RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) { - LOG(error, "Return value != OK(%d) in send for method 'visitCallback'.", retval); - } - req->SubRef(); - - return (retval == RPC::OK); + return _destination->send(_id, _domain->name(), packet); } bool Session::sendDone() { - FRT_RPCRequest *req = _supervisor.AllocRPCRequest(); - req->SetMethodName("eofCallback"); - req->GetParams()->AddString(_domain->name().c_str()); - req->GetParams()->AddInt32(id()); - bool retval(send(req)); + bool retval = _destination->sendDone(_id, _domain->name()); _inSync = true; return retval; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.h b/searchlib/src/vespa/searchlib/transactionlog/session.h index 6cd31ee4d42..bf35d83c000 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.h +++ b/searchlib/src/vespa/searchlib/transactionlog/session.h @@ -4,9 +4,9 @@ #include "common.h" #include <vespa/vespalib/util/executor.h> #include <vespa/vespalib/util/sync.h> -#include <vespa/fnet/frt/invoker.h> #include <chrono> #include <deque> +#include <atomic> class FastOS_FileInterface; @@ -23,15 +23,22 @@ private: using time_point = std::chrono::time_point<std::chrono::steady_clock>; public: + class Destination { + public: + virtual ~Destination() {} + virtual bool send(int32_t id, const vespalib::string & domain, const Packet & packet) = 0; + virtual bool sendDone(int32_t id, const vespalib::string & domain) = 0; + virtual bool connected() const = 0; + virtual bool ok() const = 0; + }; typedef std::shared_ptr<Session> SP; Session(const Session &) = delete; Session & operator = (const Session &) = delete; - Session(int sId, const SerialNumRange & r, const DomainSP & d, FRT_Supervisor & supervisor, FNET_Connection *conn); + Session(int sId, const SerialNumRange & r, const DomainSP & d, std::unique_ptr<Destination> destination); ~Session(); const SerialNumRange & range() const { return _range; } int id() const { return _id; } bool inSync() const { return _inSync; } - bool ok() const { return _ok; } bool finished() const; static Task::UP createTask(const Session::SP & session); void setStartTime(time_point startTime) { _startTime = startTime; } @@ -47,7 +54,7 @@ private: Session::SP _session; }; - bool send(FRT_RPCRequest * req); + bool ok() const { return _destination->ok(); } bool send(const Packet & packet); bool sendDone(); void visit(); @@ -55,17 +62,14 @@ private: void startVisit(); void finalize(); bool visit(FastOS_FileInterface & file, DomainPart & dp) __attribute__((noinline)); - int32_t rpc(FRT_RPCRequest * req); - FRT_Supervisor & _supervisor; - FNET_Connection * _connection; - DomainSP _domain; - SerialNumRange _range; - int _id; - bool _ok; - std::atomic<bool> _visitRunning; - std::atomic<bool> _inSync; - std::atomic<bool> _finished; - time_point _startTime; + std::unique_ptr<Destination> _destination; + DomainSP _domain; + SerialNumRange _range; + int _id; + std::atomic<bool> _visitRunning; + std::atomic<bool> _inSync; + std::atomic<bool> _finished; + time_point _startTime; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index dd6b63f9241..4b3e7bddb07 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -4,6 +4,8 @@ #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/fnet/task.h> #include <fstream> #include <vespa/log/log.h> @@ -26,21 +28,16 @@ class SyncHandler : public FNET_Task SerialNum _syncTo; public: - SyncHandler(FRT_Supervisor *supervisor, - FRT_RPCRequest *req,const Domain::SP &domain, - const TransLogServer::Session::SP &session, - SerialNum syncTo); + SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req,const Domain::SP &domain, + const TransLogServer::Session::SP &session, SerialNum syncTo); ~SyncHandler(); void PerformTask() override; }; -SyncHandler::SyncHandler(FRT_Supervisor *supervisor, - FRT_RPCRequest *req, - const Domain::SP &domain, - const TransLogServer::Session::SP &session, - SerialNum syncTo) +SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const Domain::SP &domain, + const TransLogServer::Session::SP &session, SerialNum syncTo) : FNET_Task(supervisor->GetScheduler()), _req(*req), _domain(domain), @@ -50,9 +47,7 @@ SyncHandler::SyncHandler(FRT_Supervisor *supervisor, } -SyncHandler::~SyncHandler() -{ -} +SyncHandler::~SyncHandler() = default; void @@ -154,14 +149,16 @@ TransLogServer::~TransLogServer() _supervisor->ShutDown(true); } -bool TransLogServer::onStop() +bool +TransLogServer::onStop() { LOG(info, "Stopping TLS"); _reqQ.push(NULL); return true; } -void TransLogServer::run() +void +TransLogServer::run() { FRT_RPCRequest *req(NULL); bool hasPacket(false); @@ -236,7 +233,8 @@ TransLogServer::findDomain(stringref domainName) return domain; } -void TransLogServer::exportRPC(FRT_Supervisor & supervisor) +void +TransLogServer::exportRPC(FRT_Supervisor & supervisor) { _supervisor->SetSessionInitHook(FRT_METHOD(TransLogServer::initSession), this); _supervisor->SetSessionFiniHook(FRT_METHOD(TransLogServer::finiSession), this); @@ -325,6 +323,8 @@ void TransLogServer::exportRPC(FRT_Supervisor & supervisor) namespace { +constexpr double NEVER(-1.0); + void writeDomainDir(std::lock_guard<std::mutex> &guard, vespalib::string dir, @@ -344,9 +344,77 @@ writeDomainDir(std::lock_guard<std::mutex> &guard, vespalib::File::sync(dir); } +class RPCDestination : public Session::Destination { +public: + RPCDestination(FRT_Supervisor & supervisor, FNET_Connection * connection) + : _supervisor(supervisor), _connection(connection), _ok(true) + { + _connection->AddRef(); + } + ~RPCDestination() override { _connection->SubRef(); } + + bool ok() const override { + return _ok; + } + + bool send(int32_t id, const vespalib::string & domain, const Packet & packet) override { + FRT_RPCRequest *req = _supervisor.AllocRPCRequest(); + req->SetMethodName("visitCallback"); + req->GetParams()->AddString(domain.c_str()); + req->GetParams()->AddInt32(id); + req->GetParams()->AddData(packet.getHandle().c_str(), packet.getHandle().size()); + return send(req); + } + + bool sendDone(int32_t id, const vespalib::string & domain) override { + FRT_RPCRequest *req = _supervisor.AllocRPCRequest(); + req->SetMethodName("eofCallback"); + req->GetParams()->AddString(domain.c_str()); + req->GetParams()->AddInt32(id); + bool retval(send(req)); + return retval; + } + bool connected() const override { + return (_connection->GetState() <= FNET_Connection::FNET_CONNECTED); + } +private: + bool send(FRT_RPCRequest * req) { + int32_t retval = rpc(req); + if ( ! ((retval == RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) { + LOG(error, "Return value != OK(%d) in send for method 'visitCallback'.", retval); + } + req->SubRef(); + + return (retval == RPC::OK); + } + int32_t rpc(FRT_RPCRequest * req) { + int32_t retval(-7); + LOG(debug, "rpc %s starting.", req->GetMethodName()); + FRT_Supervisor::InvokeSync(_supervisor.GetTransport(), _connection, req, NEVER); + if (req->GetErrorCode() == FRTE_NO_ERROR) { + retval = (req->GetReturn()->GetValue(0)._intval32); + LOG(debug, "rpc %s = %d\n", req->GetMethodName(), retval); + } else if (req->GetErrorCode() == FRTE_RPC_TIMEOUT) { + LOG(warning, "rpc %s timed out. Will allow to continue: error(%d): %s\n", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage()); + retval = -req->GetErrorCode(); + } else { + if (req->GetErrorCode() != FRTE_RPC_CONNECTION) { + LOG(warning, "rpc %s: error(%d): %s\n", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage()); + } + retval = -req->GetErrorCode(); + _ok = false; + } + return retval; + } + FRT_Supervisor & _supervisor; + FNET_Connection * _connection; + bool _ok; +}; + } -void TransLogServer::createDomain(FRT_RPCRequest *req) +void +TransLogServer::createDomain(FRT_RPCRequest *req) { uint32_t retval(0); FRT_Values & params = *req->GetParams(); @@ -373,7 +441,8 @@ void TransLogServer::createDomain(FRT_RPCRequest *req) ret.AddInt32(retval); } -void TransLogServer::deleteDomain(FRT_RPCRequest *req) +void +TransLogServer::deleteDomain(FRT_RPCRequest *req) { uint32_t retval(0); vespalib::string msg("ok"); @@ -410,7 +479,8 @@ void TransLogServer::deleteDomain(FRT_RPCRequest *req) ret.AddString(msg.c_str()); } -void TransLogServer::openDomain(FRT_RPCRequest *req) +void +TransLogServer::openDomain(FRT_RPCRequest *req) { uint32_t retval(0); FRT_Values & params = *req->GetParams(); @@ -427,7 +497,8 @@ void TransLogServer::openDomain(FRT_RPCRequest *req) ret.AddInt32(retval); } -void TransLogServer::listDomains(FRT_RPCRequest *req) +void +TransLogServer::listDomains(FRT_RPCRequest *req) { FRT_Values & ret = *req->GetReturn(); LOG(debug, "listDomains()"); @@ -442,7 +513,8 @@ void TransLogServer::listDomains(FRT_RPCRequest *req) ret.AddString(domains.c_str()); } -void TransLogServer::domainStatus(FRT_RPCRequest *req) +void +TransLogServer::domainStatus(FRT_RPCRequest *req) { FRT_Values & params = *req->GetParams(); FRT_Values & ret = *req->GetReturn(); @@ -462,7 +534,8 @@ void TransLogServer::domainStatus(FRT_RPCRequest *req) } } -void TransLogServer::commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) +void +TransLogServer::commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) { (void) done; Domain::SP domain(findDomain(domainName)); @@ -473,7 +546,8 @@ void TransLogServer::commit(const vespalib::string & domainName, const Packet & } } -void TransLogServer::domainCommit(FRT_RPCRequest *req) +void +TransLogServer::domainCommit(FRT_RPCRequest *req) { FRT_Values & params = *req->GetParams(); FRT_Values & ret = *req->GetReturn(); @@ -496,7 +570,8 @@ void TransLogServer::domainCommit(FRT_RPCRequest *req) } } -void TransLogServer::domainVisit(FRT_RPCRequest *req) +void +TransLogServer::domainVisit(FRT_RPCRequest *req) { uint32_t retval(uint32_t(-1)); FRT_Values & params = *req->GetParams(); @@ -508,12 +583,13 @@ void TransLogServer::domainVisit(FRT_RPCRequest *req) SerialNum from(params[1]._intval64); SerialNum to(params[2]._intval64); LOG(debug, "domainVisit(%s, %" PRIu64 ", %" PRIu64 ")", domainName, from, to); - retval = domain->visit(domain, from, to, *_supervisor, req->GetConnection()); + retval = domain->visit(domain, from, to, std::make_unique<RPCDestination>(*_supervisor, req->GetConnection())); } ret.AddInt32(retval); } -void TransLogServer::domainSessionRun(FRT_RPCRequest *req) +void +TransLogServer::domainSessionRun(FRT_RPCRequest *req) { uint32_t retval(uint32_t(-1)); FRT_Values & params = *req->GetParams(); @@ -529,13 +605,15 @@ void TransLogServer::domainSessionRun(FRT_RPCRequest *req) ret.AddInt32(retval); } -void TransLogServer::relayToThreadRPC(FRT_RPCRequest *req) +void +TransLogServer::relayToThreadRPC(FRT_RPCRequest *req) { req->Detach(); _reqQ.push(req); } -void TransLogServer::domainSessionClose(FRT_RPCRequest *req) +void +TransLogServer::domainSessionClose(FRT_RPCRequest *req) { uint32_t retval(uint32_t(-1)); FRT_Values & params = *req->GetParams(); @@ -552,7 +630,8 @@ void TransLogServer::domainSessionClose(FRT_RPCRequest *req) ret.AddInt32(retval); } -void TransLogServer::domainPrune(FRT_RPCRequest *req) +void +TransLogServer::domainPrune(FRT_RPCRequest *req) { uint32_t retval(uint32_t(-1)); FRT_Values & params = *req->GetParams(); @@ -572,7 +651,6 @@ void TransLogServer::domainPrune(FRT_RPCRequest *req) ret.AddInt32(retval); } - const TransLogServer::Session::SP & TransLogServer::getSession(FRT_RPCRequest *req) { @@ -582,14 +660,12 @@ TransLogServer::getSession(FRT_RPCRequest *req) return *sessionspp; } - void TransLogServer::initSession(FRT_RPCRequest *req) { req->GetConnection()->SetContext(new Session::SP(new Session())); } - void TransLogServer::finiSession(FRT_RPCRequest *req) { @@ -600,14 +676,12 @@ TransLogServer::finiSession(FRT_RPCRequest *req) delete sessionspp; } - void TransLogServer::downSession(FRT_RPCRequest *req) { getSession(req)->setDown(); } - void TransLogServer::domainSync(FRT_RPCRequest *req) { diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index 189be8c38d8..8aedfef6d8d 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -8,6 +8,9 @@ #include <vespa/fnet/frt/invokable.h> #include <mutex> + +class FRT_Supervisor; + namespace search::common { class FileHeaderContext; } namespace search::transactionlog { |