aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-09-07 15:53:46 +0200
committerGitHub <noreply@github.com>2018-09-07 15:53:46 +0200
commit0428eb0abc8743121d20745d0f73cde7b742d63d (patch)
treee1383077e6a0b1cdf0beed3846f10f3b20f095b3 /searchlib
parentb52d9df0eb64a4c46d6246d5f74bbb886e857547 (diff)
parent1cc9b065b9160600dcef93c6f041d774aea5d958 (diff)
Merge pull request #6854 from vespa-engine/balder/locate-all-rpc-stuff-in-one-place
Use an interface to avoid exposing rpc more than necessary.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp4
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/session.cpp68
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/session.h34
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp140
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.h3
6 files changed, 139 insertions, 112 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index b3b51e1de90..fc9518ccf1b 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -322,12 +322,12 @@ bool Domain::erase(SerialNum to)
}
int Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to,
- FRT_Supervisor & supervisor, FNET_Connection *conn)
+ std::unique_ptr<Session::Destination> dest)
{
assert(this == domain.get());
cleanSessions();
SerialNumRange range(from, to);
- auto session = std::make_shared<Session>(_sessionId++, range, domain, supervisor, conn);
+ auto session = std::make_shared<Session>(_sessionId++, range, domain, std::move(dest));
int id = session->id();
LockGuard guard(_sessionLock);
_sessions[id] = std::move(session);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index c1ff9157a6f..c0ee484926c 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -51,7 +51,7 @@ public:
bool erase(SerialNum to);
void commit(const Packet & packet);
- int visit(const Domain::SP & self, SerialNum from, SerialNum to, FRT_Supervisor & supervisor, FNET_Connection *conn);
+ int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr<Session::Destination> dest);
SerialNum begin() const;
SerialNum end() const;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp
index 1aacb4f842c..e703c32484f 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "session.h"
#include "domain.h"
-#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fastlib/io/bufferedfile.h>
#include <vespa/log/log.h>
@@ -11,14 +10,10 @@ using vespalib::LockGuard;
namespace search::transactionlog {
-namespace {
- const double NEVER(-1.0);
-}
-
vespalib::Executor::Task::UP
Session::createTask(const Session::SP & session)
{
- return Task::UP(new VisitTask(session));
+ return std::make_unique<VisitTask>(session);
}
Session::VisitTask::VisitTask(const Session::SP & session)
@@ -86,7 +81,7 @@ Session::visitOnly()
}
bool Session::finished() const {
- return _finished || (_connection->GetState() != FNET_Connection::FNET_CONNECTED);
+ return _finished || ! _destination->connected();
}
void
@@ -99,80 +94,31 @@ Session::finalize()
_finished = true;
}
-int32_t
-Session::rpc(FRT_RPCRequest * req)
-{
- int32_t retval(-7);
- LOG(debug, "rpc %s starting.", req->GetMethodName());
- FRT_Supervisor::InvokeSync(_supervisor.GetTransport(), _connection, req, NEVER);
- if (req->GetErrorCode() == FRTE_NO_ERROR) {
- retval = (req->GetReturn()->GetValue(0)._intval32);
- LOG(debug, "rpc %s = %d\n", req->GetMethodName(), retval);
- } else if (req->GetErrorCode() == FRTE_RPC_TIMEOUT) {
- LOG(warning, "rpc %s timed out. Will allow to continue: error(%d): %s\n", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage());
- retval = -req->GetErrorCode();
- } else {
- if (req->GetErrorCode() != FRTE_RPC_CONNECTION) {
- LOG(warning, "rpc %s: error(%d): %s\n", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage());
- }
- retval = -req->GetErrorCode();
- _ok = false;
- }
- return retval;
-}
-
Session::Session(int sId, const SerialNumRange & r, const Domain::SP & d,
- FRT_Supervisor & supervisor, FNET_Connection *conn) :
- _supervisor(supervisor),
- _connection(conn),
+ std::unique_ptr<Destination> destination) :
+ _destination(std::move(destination)),
_domain(d),
_range(r),
_id(sId),
- _ok(true),
_visitRunning(false),
_inSync(false),
_finished(false),
_startTime()
{
- _connection->AddRef();
}
-Session::~Session()
-{
- _connection->SubRef();
-}
+Session::~Session() = default;
bool
Session::send(const Packet & packet)
{
- FRT_RPCRequest *req = _supervisor.AllocRPCRequest();
- req->SetMethodName("visitCallback");
- req->GetParams()->AddString(_domain->name().c_str());
- req->GetParams()->AddInt32(id());
- req->GetParams()->AddData(packet.getHandle().c_str(), packet.getHandle().size());
- return send(req);
-}
-
-bool
-Session::send(FRT_RPCRequest * req)
-{
- int32_t retval = rpc(req);
- if ( ! ((retval == RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) {
- LOG(error, "Return value != OK(%d) in send for method 'visitCallback'.", retval);
- }
- req->SubRef();
-
- return (retval == RPC::OK);
+ return _destination->send(_id, _domain->name(), packet);
}
bool
Session::sendDone()
{
- FRT_RPCRequest *req = _supervisor.AllocRPCRequest();
- req->SetMethodName("eofCallback");
- req->GetParams()->AddString(_domain->name().c_str());
- req->GetParams()->AddInt32(id());
- bool retval(send(req));
+ bool retval = _destination->sendDone(_id, _domain->name());
_inSync = true;
return retval;
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.h b/searchlib/src/vespa/searchlib/transactionlog/session.h
index 6cd31ee4d42..bf35d83c000 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/session.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/session.h
@@ -4,9 +4,9 @@
#include "common.h"
#include <vespa/vespalib/util/executor.h>
#include <vespa/vespalib/util/sync.h>
-#include <vespa/fnet/frt/invoker.h>
#include <chrono>
#include <deque>
+#include <atomic>
class FastOS_FileInterface;
@@ -23,15 +23,22 @@ private:
using time_point = std::chrono::time_point<std::chrono::steady_clock>;
public:
+ class Destination {
+ public:
+ virtual ~Destination() {}
+ virtual bool send(int32_t id, const vespalib::string & domain, const Packet & packet) = 0;
+ virtual bool sendDone(int32_t id, const vespalib::string & domain) = 0;
+ virtual bool connected() const = 0;
+ virtual bool ok() const = 0;
+ };
typedef std::shared_ptr<Session> SP;
Session(const Session &) = delete;
Session & operator = (const Session &) = delete;
- Session(int sId, const SerialNumRange & r, const DomainSP & d, FRT_Supervisor & supervisor, FNET_Connection *conn);
+ Session(int sId, const SerialNumRange & r, const DomainSP & d, std::unique_ptr<Destination> destination);
~Session();
const SerialNumRange & range() const { return _range; }
int id() const { return _id; }
bool inSync() const { return _inSync; }
- bool ok() const { return _ok; }
bool finished() const;
static Task::UP createTask(const Session::SP & session);
void setStartTime(time_point startTime) { _startTime = startTime; }
@@ -47,7 +54,7 @@ private:
Session::SP _session;
};
- bool send(FRT_RPCRequest * req);
+ bool ok() const { return _destination->ok(); }
bool send(const Packet & packet);
bool sendDone();
void visit();
@@ -55,17 +62,14 @@ private:
void startVisit();
void finalize();
bool visit(FastOS_FileInterface & file, DomainPart & dp) __attribute__((noinline));
- int32_t rpc(FRT_RPCRequest * req);
- FRT_Supervisor & _supervisor;
- FNET_Connection * _connection;
- DomainSP _domain;
- SerialNumRange _range;
- int _id;
- bool _ok;
- std::atomic<bool> _visitRunning;
- std::atomic<bool> _inSync;
- std::atomic<bool> _finished;
- time_point _startTime;
+ std::unique_ptr<Destination> _destination;
+ DomainSP _domain;
+ SerialNumRange _range;
+ int _id;
+ std::atomic<bool> _visitRunning;
+ std::atomic<bool> _inSync;
+ std::atomic<bool> _finished;
+ time_point _startTime;
};
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index dd6b63f9241..4b3e7bddb07 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -4,6 +4,8 @@
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/fnet/task.h>
#include <fstream>
#include <vespa/log/log.h>
@@ -26,21 +28,16 @@ class SyncHandler : public FNET_Task
SerialNum _syncTo;
public:
- SyncHandler(FRT_Supervisor *supervisor,
- FRT_RPCRequest *req,const Domain::SP &domain,
- const TransLogServer::Session::SP &session,
- SerialNum syncTo);
+ SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req,const Domain::SP &domain,
+ const TransLogServer::Session::SP &session, SerialNum syncTo);
~SyncHandler();
void PerformTask() override;
};
-SyncHandler::SyncHandler(FRT_Supervisor *supervisor,
- FRT_RPCRequest *req,
- const Domain::SP &domain,
- const TransLogServer::Session::SP &session,
- SerialNum syncTo)
+SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const Domain::SP &domain,
+ const TransLogServer::Session::SP &session, SerialNum syncTo)
: FNET_Task(supervisor->GetScheduler()),
_req(*req),
_domain(domain),
@@ -50,9 +47,7 @@ SyncHandler::SyncHandler(FRT_Supervisor *supervisor,
}
-SyncHandler::~SyncHandler()
-{
-}
+SyncHandler::~SyncHandler() = default;
void
@@ -154,14 +149,16 @@ TransLogServer::~TransLogServer()
_supervisor->ShutDown(true);
}
-bool TransLogServer::onStop()
+bool
+TransLogServer::onStop()
{
LOG(info, "Stopping TLS");
_reqQ.push(NULL);
return true;
}
-void TransLogServer::run()
+void
+TransLogServer::run()
{
FRT_RPCRequest *req(NULL);
bool hasPacket(false);
@@ -236,7 +233,8 @@ TransLogServer::findDomain(stringref domainName)
return domain;
}
-void TransLogServer::exportRPC(FRT_Supervisor & supervisor)
+void
+TransLogServer::exportRPC(FRT_Supervisor & supervisor)
{
_supervisor->SetSessionInitHook(FRT_METHOD(TransLogServer::initSession), this);
_supervisor->SetSessionFiniHook(FRT_METHOD(TransLogServer::finiSession), this);
@@ -325,6 +323,8 @@ void TransLogServer::exportRPC(FRT_Supervisor & supervisor)
namespace {
+constexpr double NEVER(-1.0);
+
void
writeDomainDir(std::lock_guard<std::mutex> &guard,
vespalib::string dir,
@@ -344,9 +344,77 @@ writeDomainDir(std::lock_guard<std::mutex> &guard,
vespalib::File::sync(dir);
}
+class RPCDestination : public Session::Destination {
+public:
+ RPCDestination(FRT_Supervisor & supervisor, FNET_Connection * connection)
+ : _supervisor(supervisor), _connection(connection), _ok(true)
+ {
+ _connection->AddRef();
+ }
+ ~RPCDestination() override { _connection->SubRef(); }
+
+ bool ok() const override {
+ return _ok;
+ }
+
+ bool send(int32_t id, const vespalib::string & domain, const Packet & packet) override {
+ FRT_RPCRequest *req = _supervisor.AllocRPCRequest();
+ req->SetMethodName("visitCallback");
+ req->GetParams()->AddString(domain.c_str());
+ req->GetParams()->AddInt32(id);
+ req->GetParams()->AddData(packet.getHandle().c_str(), packet.getHandle().size());
+ return send(req);
+ }
+
+ bool sendDone(int32_t id, const vespalib::string & domain) override {
+ FRT_RPCRequest *req = _supervisor.AllocRPCRequest();
+ req->SetMethodName("eofCallback");
+ req->GetParams()->AddString(domain.c_str());
+ req->GetParams()->AddInt32(id);
+ bool retval(send(req));
+ return retval;
+ }
+ bool connected() const override {
+ return (_connection->GetState() <= FNET_Connection::FNET_CONNECTED);
+ }
+private:
+ bool send(FRT_RPCRequest * req) {
+ int32_t retval = rpc(req);
+ if ( ! ((retval == RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) {
+ LOG(error, "Return value != OK(%d) in send for method 'visitCallback'.", retval);
+ }
+ req->SubRef();
+
+ return (retval == RPC::OK);
+ }
+ int32_t rpc(FRT_RPCRequest * req) {
+ int32_t retval(-7);
+ LOG(debug, "rpc %s starting.", req->GetMethodName());
+ FRT_Supervisor::InvokeSync(_supervisor.GetTransport(), _connection, req, NEVER);
+ if (req->GetErrorCode() == FRTE_NO_ERROR) {
+ retval = (req->GetReturn()->GetValue(0)._intval32);
+ LOG(debug, "rpc %s = %d\n", req->GetMethodName(), retval);
+ } else if (req->GetErrorCode() == FRTE_RPC_TIMEOUT) {
+ LOG(warning, "rpc %s timed out. Will allow to continue: error(%d): %s\n", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage());
+ retval = -req->GetErrorCode();
+ } else {
+ if (req->GetErrorCode() != FRTE_RPC_CONNECTION) {
+ LOG(warning, "rpc %s: error(%d): %s\n", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage());
+ }
+ retval = -req->GetErrorCode();
+ _ok = false;
+ }
+ return retval;
+ }
+ FRT_Supervisor & _supervisor;
+ FNET_Connection * _connection;
+ bool _ok;
+};
+
}
-void TransLogServer::createDomain(FRT_RPCRequest *req)
+void
+TransLogServer::createDomain(FRT_RPCRequest *req)
{
uint32_t retval(0);
FRT_Values & params = *req->GetParams();
@@ -373,7 +441,8 @@ void TransLogServer::createDomain(FRT_RPCRequest *req)
ret.AddInt32(retval);
}
-void TransLogServer::deleteDomain(FRT_RPCRequest *req)
+void
+TransLogServer::deleteDomain(FRT_RPCRequest *req)
{
uint32_t retval(0);
vespalib::string msg("ok");
@@ -410,7 +479,8 @@ void TransLogServer::deleteDomain(FRT_RPCRequest *req)
ret.AddString(msg.c_str());
}
-void TransLogServer::openDomain(FRT_RPCRequest *req)
+void
+TransLogServer::openDomain(FRT_RPCRequest *req)
{
uint32_t retval(0);
FRT_Values & params = *req->GetParams();
@@ -427,7 +497,8 @@ void TransLogServer::openDomain(FRT_RPCRequest *req)
ret.AddInt32(retval);
}
-void TransLogServer::listDomains(FRT_RPCRequest *req)
+void
+TransLogServer::listDomains(FRT_RPCRequest *req)
{
FRT_Values & ret = *req->GetReturn();
LOG(debug, "listDomains()");
@@ -442,7 +513,8 @@ void TransLogServer::listDomains(FRT_RPCRequest *req)
ret.AddString(domains.c_str());
}
-void TransLogServer::domainStatus(FRT_RPCRequest *req)
+void
+TransLogServer::domainStatus(FRT_RPCRequest *req)
{
FRT_Values & params = *req->GetParams();
FRT_Values & ret = *req->GetReturn();
@@ -462,7 +534,8 @@ void TransLogServer::domainStatus(FRT_RPCRequest *req)
}
}
-void TransLogServer::commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done)
+void
+TransLogServer::commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done)
{
(void) done;
Domain::SP domain(findDomain(domainName));
@@ -473,7 +546,8 @@ void TransLogServer::commit(const vespalib::string & domainName, const Packet &
}
}
-void TransLogServer::domainCommit(FRT_RPCRequest *req)
+void
+TransLogServer::domainCommit(FRT_RPCRequest *req)
{
FRT_Values & params = *req->GetParams();
FRT_Values & ret = *req->GetReturn();
@@ -496,7 +570,8 @@ void TransLogServer::domainCommit(FRT_RPCRequest *req)
}
}
-void TransLogServer::domainVisit(FRT_RPCRequest *req)
+void
+TransLogServer::domainVisit(FRT_RPCRequest *req)
{
uint32_t retval(uint32_t(-1));
FRT_Values & params = *req->GetParams();
@@ -508,12 +583,13 @@ void TransLogServer::domainVisit(FRT_RPCRequest *req)
SerialNum from(params[1]._intval64);
SerialNum to(params[2]._intval64);
LOG(debug, "domainVisit(%s, %" PRIu64 ", %" PRIu64 ")", domainName, from, to);
- retval = domain->visit(domain, from, to, *_supervisor, req->GetConnection());
+ retval = domain->visit(domain, from, to, std::make_unique<RPCDestination>(*_supervisor, req->GetConnection()));
}
ret.AddInt32(retval);
}
-void TransLogServer::domainSessionRun(FRT_RPCRequest *req)
+void
+TransLogServer::domainSessionRun(FRT_RPCRequest *req)
{
uint32_t retval(uint32_t(-1));
FRT_Values & params = *req->GetParams();
@@ -529,13 +605,15 @@ void TransLogServer::domainSessionRun(FRT_RPCRequest *req)
ret.AddInt32(retval);
}
-void TransLogServer::relayToThreadRPC(FRT_RPCRequest *req)
+void
+TransLogServer::relayToThreadRPC(FRT_RPCRequest *req)
{
req->Detach();
_reqQ.push(req);
}
-void TransLogServer::domainSessionClose(FRT_RPCRequest *req)
+void
+TransLogServer::domainSessionClose(FRT_RPCRequest *req)
{
uint32_t retval(uint32_t(-1));
FRT_Values & params = *req->GetParams();
@@ -552,7 +630,8 @@ void TransLogServer::domainSessionClose(FRT_RPCRequest *req)
ret.AddInt32(retval);
}
-void TransLogServer::domainPrune(FRT_RPCRequest *req)
+void
+TransLogServer::domainPrune(FRT_RPCRequest *req)
{
uint32_t retval(uint32_t(-1));
FRT_Values & params = *req->GetParams();
@@ -572,7 +651,6 @@ void TransLogServer::domainPrune(FRT_RPCRequest *req)
ret.AddInt32(retval);
}
-
const TransLogServer::Session::SP &
TransLogServer::getSession(FRT_RPCRequest *req)
{
@@ -582,14 +660,12 @@ TransLogServer::getSession(FRT_RPCRequest *req)
return *sessionspp;
}
-
void
TransLogServer::initSession(FRT_RPCRequest *req)
{
req->GetConnection()->SetContext(new Session::SP(new Session()));
}
-
void
TransLogServer::finiSession(FRT_RPCRequest *req)
{
@@ -600,14 +676,12 @@ TransLogServer::finiSession(FRT_RPCRequest *req)
delete sessionspp;
}
-
void
TransLogServer::downSession(FRT_RPCRequest *req)
{
getSession(req)->setDown();
}
-
void
TransLogServer::domainSync(FRT_RPCRequest *req)
{
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
index 189be8c38d8..8aedfef6d8d 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
@@ -8,6 +8,9 @@
#include <vespa/fnet/frt/invokable.h>
#include <mutex>
+
+class FRT_Supervisor;
+
namespace search::common { class FileHeaderContext; }
namespace search::transactionlog {