aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@oath.com>2018-09-07 14:48:10 +0200
committerHenning Baldersheim <balder@oath.com>2018-09-07 14:49:02 +0200
commit8caf63c642461ff3a6d14610bddbdd887557d924 (patch)
tree43f2fbd0a14568bd431da3112f8bc480c5f88cae /searchlib
parent83fec633592486ef694856ece90eb76469a2a5f6 (diff)
Use an interface to avoid exposing rpc more than necessary.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp4
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/session.cpp67
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/session.h32
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp90
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.h3
6 files changed, 111 insertions, 87 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index b3b51e1de90..fc9518ccf1b 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -322,12 +322,12 @@ bool Domain::erase(SerialNum to)
}
int Domain::visit(const Domain::SP & domain, SerialNum from, SerialNum to,
- FRT_Supervisor & supervisor, FNET_Connection *conn)
+ std::unique_ptr<Session::Destination> dest)
{
assert(this == domain.get());
cleanSessions();
SerialNumRange range(from, to);
- auto session = std::make_shared<Session>(_sessionId++, range, domain, supervisor, conn);
+ auto session = std::make_shared<Session>(_sessionId++, range, domain, std::move(dest));
int id = session->id();
LockGuard guard(_sessionLock);
_sessions[id] = std::move(session);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index c1ff9157a6f..c0ee484926c 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -51,7 +51,7 @@ public:
bool erase(SerialNum to);
void commit(const Packet & packet);
- int visit(const Domain::SP & self, SerialNum from, SerialNum to, FRT_Supervisor & supervisor, FNET_Connection *conn);
+ int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr<Session::Destination> dest);
SerialNum begin() const;
SerialNum end() const;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp
index 1aacb4f842c..ef36ff980d8 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "session.h"
#include "domain.h"
-#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fastlib/io/bufferedfile.h>
#include <vespa/log/log.h>
@@ -11,14 +10,10 @@ using vespalib::LockGuard;
namespace search::transactionlog {
-namespace {
- const double NEVER(-1.0);
-}
-
vespalib::Executor::Task::UP
Session::createTask(const Session::SP & session)
{
- return Task::UP(new VisitTask(session));
+ return std::make_unique<VisitTask>(session);
}
Session::VisitTask::VisitTask(const Session::SP & session)
@@ -86,7 +81,7 @@ Session::visitOnly()
}
bool Session::finished() const {
- return _finished || (_connection->GetState() != FNET_Connection::FNET_CONNECTED);
+ return _finished || ! _destination->connected();
}
void
@@ -99,32 +94,9 @@ Session::finalize()
_finished = true;
}
-int32_t
-Session::rpc(FRT_RPCRequest * req)
-{
- int32_t retval(-7);
- LOG(debug, "rpc %s starting.", req->GetMethodName());
- FRT_Supervisor::InvokeSync(_supervisor.GetTransport(), _connection, req, NEVER);
- if (req->GetErrorCode() == FRTE_NO_ERROR) {
- retval = (req->GetReturn()->GetValue(0)._intval32);
- LOG(debug, "rpc %s = %d\n", req->GetMethodName(), retval);
- } else if (req->GetErrorCode() == FRTE_RPC_TIMEOUT) {
- LOG(warning, "rpc %s timed out. Will allow to continue: error(%d): %s\n", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage());
- retval = -req->GetErrorCode();
- } else {
- if (req->GetErrorCode() != FRTE_RPC_CONNECTION) {
- LOG(warning, "rpc %s: error(%d): %s\n", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage());
- }
- retval = -req->GetErrorCode();
- _ok = false;
- }
- return retval;
-}
-
Session::Session(int sId, const SerialNumRange & r, const Domain::SP & d,
- FRT_Supervisor & supervisor, FNET_Connection *conn) :
- _supervisor(supervisor),
- _connection(conn),
+ std::unique_ptr<Destination> destination) :
+ _destination(std::move(destination)),
_domain(d),
_range(r),
_id(sId),
@@ -134,45 +106,20 @@ Session::Session(int sId, const SerialNumRange & r, const Domain::SP & d,
_finished(false),
_startTime()
{
- _connection->AddRef();
}
-Session::~Session()
-{
- _connection->SubRef();
-}
+Session::~Session() = default;
bool
Session::send(const Packet & packet)
{
- FRT_RPCRequest *req = _supervisor.AllocRPCRequest();
- req->SetMethodName("visitCallback");
- req->GetParams()->AddString(_domain->name().c_str());
- req->GetParams()->AddInt32(id());
- req->GetParams()->AddData(packet.getHandle().c_str(), packet.getHandle().size());
- return send(req);
-}
-
-bool
-Session::send(FRT_RPCRequest * req)
-{
- int32_t retval = rpc(req);
- if ( ! ((retval == RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) {
- LOG(error, "Return value != OK(%d) in send for method 'visitCallback'.", retval);
- }
- req->SubRef();
-
- return (retval == RPC::OK);
+ return _destination->send(_id, _domain->name(), packet);
}
bool
Session::sendDone()
{
- FRT_RPCRequest *req = _supervisor.AllocRPCRequest();
- req->SetMethodName("eofCallback");
- req->GetParams()->AddString(_domain->name().c_str());
- req->GetParams()->AddInt32(id());
- bool retval(send(req));
+ bool retval = _destination->sendDone(_id, _domain->name());
_inSync = true;
return retval;
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.h b/searchlib/src/vespa/searchlib/transactionlog/session.h
index 6cd31ee4d42..dde9c03d3b8 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/session.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/session.h
@@ -4,9 +4,9 @@
#include "common.h"
#include <vespa/vespalib/util/executor.h>
#include <vespa/vespalib/util/sync.h>
-#include <vespa/fnet/frt/invoker.h>
#include <chrono>
#include <deque>
+#include <atomic>
class FastOS_FileInterface;
@@ -23,10 +23,17 @@ private:
using time_point = std::chrono::time_point<std::chrono::steady_clock>;
public:
+ class Destination {
+ public:
+ virtual ~Destination() {}
+ virtual bool send(int32_t id, const vespalib::string & domain, const Packet & packet) = 0;
+ virtual bool sendDone(int32_t id, const vespalib::string & domain) = 0;
+ virtual bool connected() const = 0;
+ };
typedef std::shared_ptr<Session> SP;
Session(const Session &) = delete;
Session & operator = (const Session &) = delete;
- Session(int sId, const SerialNumRange & r, const DomainSP & d, FRT_Supervisor & supervisor, FNET_Connection *conn);
+ Session(int sId, const SerialNumRange & r, const DomainSP & d, std::unique_ptr<Destination> destination);
~Session();
const SerialNumRange & range() const { return _range; }
int id() const { return _id; }
@@ -47,7 +54,6 @@ private:
Session::SP _session;
};
- bool send(FRT_RPCRequest * req);
bool send(const Packet & packet);
bool sendDone();
void visit();
@@ -55,17 +61,15 @@ private:
void startVisit();
void finalize();
bool visit(FastOS_FileInterface & file, DomainPart & dp) __attribute__((noinline));
- int32_t rpc(FRT_RPCRequest * req);
- FRT_Supervisor & _supervisor;
- FNET_Connection * _connection;
- DomainSP _domain;
- SerialNumRange _range;
- int _id;
- bool _ok;
- std::atomic<bool> _visitRunning;
- std::atomic<bool> _inSync;
- std::atomic<bool> _finished;
- time_point _startTime;
+ std::unique_ptr<Destination> _destination;
+ DomainSP _domain;
+ SerialNumRange _range;
+ int _id;
+ bool _ok;
+ std::atomic<bool> _visitRunning;
+ std::atomic<bool> _inSync;
+ std::atomic<bool> _finished;
+ time_point _startTime;
};
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index dd6b63f9241..c344ae7bbaa 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -4,6 +4,8 @@
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/fnet/task.h>
#include <fstream>
#include <vespa/log/log.h>
@@ -26,21 +28,16 @@ class SyncHandler : public FNET_Task
SerialNum _syncTo;
public:
- SyncHandler(FRT_Supervisor *supervisor,
- FRT_RPCRequest *req,const Domain::SP &domain,
- const TransLogServer::Session::SP &session,
- SerialNum syncTo);
+ SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req,const Domain::SP &domain,
+ const TransLogServer::Session::SP &session, SerialNum syncTo);
~SyncHandler();
void PerformTask() override;
};
-SyncHandler::SyncHandler(FRT_Supervisor *supervisor,
- FRT_RPCRequest *req,
- const Domain::SP &domain,
- const TransLogServer::Session::SP &session,
- SerialNum syncTo)
+SyncHandler::SyncHandler(FRT_Supervisor *supervisor, FRT_RPCRequest *req, const Domain::SP &domain,
+ const TransLogServer::Session::SP &session, SerialNum syncTo)
: FNET_Task(supervisor->GetScheduler()),
_req(*req),
_domain(domain),
@@ -325,6 +322,8 @@ void TransLogServer::exportRPC(FRT_Supervisor & supervisor)
namespace {
+constexpr double NEVER(-1.0);
+
void
writeDomainDir(std::lock_guard<std::mutex> &guard,
vespalib::string dir,
@@ -344,6 +343,77 @@ writeDomainDir(std::lock_guard<std::mutex> &guard,
vespalib::File::sync(dir);
}
+class RPCDestination : public Session::Destination {
+public:
+ RPCDestination(FRT_Supervisor & supervisor, FNET_Connection * connection)
+ : _supervisor(supervisor), _connection(connection), _ok(true)
+ {
+ _connection->AddRef();
+ }
+ ~RPCDestination() override { _connection->SubRef(); }
+
+ bool
+ send(int32_t id, const vespalib::string & domain, const Packet & packet) override
+ {
+ FRT_RPCRequest *req = _supervisor.AllocRPCRequest();
+ req->SetMethodName("visitCallback");
+ req->GetParams()->AddString(domain.c_str());
+ req->GetParams()->AddInt32(id);
+ req->GetParams()->AddData(packet.getHandle().c_str(), packet.getHandle().size());
+ return send(req);
+ }
+
+ bool
+ sendDone(int32_t id, const vespalib::string & domain) override
+ {
+ FRT_RPCRequest *req = _supervisor.AllocRPCRequest();
+ req->SetMethodName("eofCallback");
+ req->GetParams()->AddString(domain.c_str());
+ req->GetParams()->AddInt32(id);
+ bool retval(send(req));
+ return retval;
+ }
+ bool connected() const override {
+ return (_connection->GetState() != FNET_Connection::FNET_CONNECTED);
+ }
+private:
+ bool
+ send(FRT_RPCRequest * req)
+ {
+ int32_t retval = rpc(req);
+ if ( ! ((retval == RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) {
+ LOG(error, "Return value != OK(%d) in send for method 'visitCallback'.", retval);
+ }
+ req->SubRef();
+
+ return (retval == RPC::OK);
+ }
+ int32_t
+ rpc(FRT_RPCRequest * req)
+ {
+ int32_t retval(-7);
+ LOG(debug, "rpc %s starting.", req->GetMethodName());
+ FRT_Supervisor::InvokeSync(_supervisor.GetTransport(), _connection, req, NEVER);
+ if (req->GetErrorCode() == FRTE_NO_ERROR) {
+ retval = (req->GetReturn()->GetValue(0)._intval32);
+ LOG(debug, "rpc %s = %d\n", req->GetMethodName(), retval);
+ } else if (req->GetErrorCode() == FRTE_RPC_TIMEOUT) {
+ LOG(warning, "rpc %s timed out. Will allow to continue: error(%d): %s\n", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage());
+ retval = -req->GetErrorCode();
+ } else {
+ if (req->GetErrorCode() != FRTE_RPC_CONNECTION) {
+ LOG(warning, "rpc %s: error(%d): %s\n", req->GetMethodName(), req->GetErrorCode(), req->GetErrorMessage());
+ }
+ retval = -req->GetErrorCode();
+ _ok = false;
+ }
+ return retval;
+ }
+ FRT_Supervisor & _supervisor;
+ FNET_Connection * _connection;
+ bool _ok;
+};
+
}
void TransLogServer::createDomain(FRT_RPCRequest *req)
@@ -508,7 +578,7 @@ void TransLogServer::domainVisit(FRT_RPCRequest *req)
SerialNum from(params[1]._intval64);
SerialNum to(params[2]._intval64);
LOG(debug, "domainVisit(%s, %" PRIu64 ", %" PRIu64 ")", domainName, from, to);
- retval = domain->visit(domain, from, to, *_supervisor, req->GetConnection());
+ retval = domain->visit(domain, from, to, std::make_unique<RPCDestination>(*_supervisor, req->GetConnection()));
}
ret.AddInt32(retval);
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
index 189be8c38d8..8aedfef6d8d 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
@@ -8,6 +8,9 @@
#include <vespa/fnet/frt/invokable.h>
#include <mutex>
+
+class FRT_Supervisor;
+
namespace search::common { class FileHeaderContext; }
namespace search::transactionlog {