summaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-01-08 14:24:06 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2018-01-08 14:31:01 +0100
commit225e82fb0bc9f6cfe387a254a082ebfd8b65ef9c (patch)
treecd3570c67189fe72f683b79b636f58144c95f6fd /searchlib
parent025d1955fba9fbf0fcd9c3d05ad7d020229183e7 (diff)
- Remove some unused code.
- No need to sync executor just to know if visitation was complete.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp11
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/session.cpp105
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/session.h36
3 files changed, 39 insertions, 113 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index 1481f4cc839..44c7157d70e 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -354,16 +354,17 @@ int Domain::closeSession(int sessionId)
if (found != _sessions.end()) {
sessionRunTime = (std::chrono::steady_clock::now() - found->second->getStartTime());
retval = 1;
- _sessionExecutor.sync();
}
}
- if (retval == 1) {
+ while (retval == 1) {
FastOS_Thread::Sleep(10);
- LockGuard guard(_sessionLock);
+ MonitorGuard guard(_sessionLock);
SessionList::iterator found = _sessions.find(sessionId);
if (found != _sessions.end()) {
- _sessions.erase(sessionId);
- retval = 0;
+ if ( ! found->second->isVisitRunning()) {
+ _sessions.erase(sessionId);
+ retval = 0;
+ }
} else {
retval = 0;
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp
index 6f0ab998e26..cbcbc68fdff 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp
@@ -3,7 +3,6 @@
#include "domain.h"
#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fastlib/io/bufferedfile.h>
-#include <vespa/vespalib/util/closuretask.h>
#include <vespa/log/log.h>
LOG_SETUP(".transactionlog.session");
@@ -22,22 +21,17 @@ Session::createTask(const Session::SP & session)
return Task::UP(new VisitTask(session));
}
-void
-Session::VisitTask::run()
+Session::VisitTask::VisitTask(const Session::SP & session)
+ : _session(session)
{
- _session->visitOnly();
+ _session->startVisit();
}
+Session::VisitTask::~VisitTask() = default;
void
-Session::SendTask::run()
-{
- _session->sendPending();
-}
-
-bool
-Session::inSync() const
+Session::VisitTask::run()
{
- return _inSync;
+ _session->visitOnly();
}
bool
@@ -49,7 +43,7 @@ Session::visit(FastOS_FileInterface & file, DomainPart & dp) {
} else {
more = dp.visit(_range, packet);
}
- if (packet.getHandle().size() > 0) {
+ if ( ! packet.getHandle().empty()) {
send(packet);
}
return more;
@@ -78,11 +72,17 @@ Session::visit()
}
void
+Session::startVisit() {
+ assert(!_visitRunning);
+ _visitRunning = true;
+}
+void
Session::visitOnly()
{
visit();
sendDone();
finalize();
+ _visitRunning = false;
}
bool Session::finished() const {
@@ -90,42 +90,6 @@ bool Session::finished() const {
}
void
-Session::enQ(const SP & session, SerialNum serial, const Packet & packet)
-{
- LockGuard guard(session->_lock);
- session->_packetQ.push_back(QPacket(serial,packet));
- if (session->_inSync) {
- session->_domain->execute(Task::UP(new SendTask(session)));
- }
-}
-
-void
-Session::sendPending()
-{
- for (;;) {
- QPacket packet;
- {
- LockGuard guard(_lock);
- if (_packetQ.empty() || !ok())
- break;
- packet = std::move(_packetQ.front());
- _packetQ.pop_front();
- }
- sendPacket(packet._serial, *packet._packet);
- }
-}
-
-void
-Session::sendPacket(SerialNum serial, const Packet & packet)
-{
- if (_range.from() < serial) {
- send(packet);
- } else {
- LOG(debug, "[%d] : Skipping %" PRIu64 ". Last sent is %" PRIu64, _id, serial, _range.from());
- }
-}
-
-void
Session::finalize()
{
if (!ok()) {
@@ -172,21 +136,6 @@ Session::RequestDone(FRT_RPCRequest * req)
req->SubRef();
}
-int32_t
-Session::rpcAsync(FRT_RPCRequest * req)
-{
- int32_t retval(-7);
- LOG(debug, "rpcAsync %s starting.", req->GetMethodName());
- FRT_Supervisor::InvokeAsync(_supervisor.GetTransport(), _connection, req, NEVER, this);
- if (ok()) {
- LOG(debug, "rpcAsync %s OK", req->GetMethodName());
- retval = 0;
- } else {
- LOG(warning, "rpcAsync %s FAILED", req->GetMethodName());
- }
- return retval;
-}
-
Session::Session(int sId, const SerialNumRange & r, const Domain::SP & d,
FRT_Supervisor & supervisor, FNET_Connection *conn) :
_supervisor(supervisor),
@@ -194,12 +143,11 @@ Session::Session(int sId, const SerialNumRange & r, const Domain::SP & d,
_domain(d),
_range(r),
_id(sId),
- _inSync(false),
_ok(true),
+ _visitRunning(false),
+ _inSync(false),
_finished(false),
- _packetQ(),
- _startTime(),
- _lock()
+ _startTime()
{
_connection->AddRef();
}
@@ -217,22 +165,18 @@ Session::send(const Packet & packet)
req->GetParams()->AddString(_domain->name().c_str());
req->GetParams()->AddInt32(id());
req->GetParams()->AddData(packet.getHandle().c_str(), packet.getHandle().size());
- return send(req, true);
+ return send(req);
}
bool
-Session::send(FRT_RPCRequest * req, bool wait)
+Session::send(FRT_RPCRequest * req)
{
- int32_t retval(-1);
- if (wait) {
- retval = rpc(req);
- if ( ! ((retval == RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) {
- LOG(error, "Return value != OK(%d) in send for method 'visitCallback'.", retval);
- }
- req->SubRef();
- } else {
- retval = rpcAsync(req);
+ int32_t retval = rpc(req);
+ if ( ! ((retval == RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) {
+ LOG(error, "Return value != OK(%d) in send for method 'visitCallback'.", retval);
}
+ req->SubRef();
+
return (retval == RPC::OK);
}
@@ -243,8 +187,7 @@ Session::sendDone()
req->SetMethodName("eofCallback");
req->GetParams()->AddString(_domain->name().c_str());
req->GetParams()->AddInt32(id());
- bool retval(send(req, true));
- LockGuard guard(_lock);
+ bool retval(send(req));
_inSync = true;
return retval;
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.h b/searchlib/src/vespa/searchlib/transactionlog/session.h
index c42d6839dfa..29038ec5290 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/session.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/session.h
@@ -30,61 +30,43 @@ public:
~Session();
const SerialNumRange & range() const { return _range; }
int id() const { return _id; }
- bool inSync() const;
+ bool inSync() const { return _inSync; }
bool ok() const { return _ok; }
bool finished() const;
- static void enQ(const SP & session, SerialNum serial, const Packet & packet);
static Task::UP createTask(const Session::SP & session);
void setStartTime(time_point startTime) { _startTime = startTime; }
time_point getStartTime() const { return _startTime; }
+ bool isVisitRunning() const { return _visitRunning; }
private:
- struct QPacket {
- QPacket() : _serial(0), _packet() {}
- QPacket(SerialNum s, const Packet & p)
- : _serial(s),
- _packet(new Packet(p))
- { }
- SerialNum _serial;
- std::unique_ptr<Packet> _packet;
- };
class VisitTask : public Task {
public:
- VisitTask(const Session::SP & session) : _session(session) { }
+ VisitTask(const Session::SP & session);
+ ~VisitTask();
private:
void run() override;
Session::SP _session;
};
- class SendTask : public Task {
- public:
- SendTask(const Session::SP & session) : _session(session) { }
- void run() override;
- private:
- Session::SP _session;
- };
- bool send(FRT_RPCRequest * req, bool wait);
+ bool send(FRT_RPCRequest * req);
void RequestDone(FRT_RPCRequest *req) override;
bool send(const Packet & packet);
- void sendPacket(SerialNum serial, const Packet & packet);
bool sendDone();
- void sendPending();
void visit();
void visitOnly();
+ void startVisit();
void finalize();
bool visit(FastOS_FileInterface & file, DomainPart & dp) __attribute__((noinline));
int32_t rpc(FRT_RPCRequest * req);
- int32_t rpcAsync(FRT_RPCRequest * req);
FRT_Supervisor & _supervisor;
FNET_Connection * _connection;
DomainSP _domain;
SerialNumRange _range;
int _id;
- bool _inSync;
bool _ok;
- bool _finished;
- std::deque<QPacket> _packetQ;
+ std::atomic<bool> _visitRunning;
+ std::atomic<bool> _inSync;
+ std::atomic<bool> _finished;
time_point _startTime;
- vespalib::Lock _lock;
};
}