diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-01-08 14:24:06 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2018-01-08 14:31:01 +0100 |
commit | 225e82fb0bc9f6cfe387a254a082ebfd8b65ef9c (patch) | |
tree | cd3570c67189fe72f683b79b636f58144c95f6fd /searchlib | |
parent | 025d1955fba9fbf0fcd9c3d05ad7d020229183e7 (diff) |
- Remove some unused code.
- No need to sync executor just to know if visitation was complete.
Diffstat (limited to 'searchlib')
3 files changed, 39 insertions, 113 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 1481f4cc839..44c7157d70e 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -354,16 +354,17 @@ int Domain::closeSession(int sessionId) if (found != _sessions.end()) { sessionRunTime = (std::chrono::steady_clock::now() - found->second->getStartTime()); retval = 1; - _sessionExecutor.sync(); } } - if (retval == 1) { + while (retval == 1) { FastOS_Thread::Sleep(10); - LockGuard guard(_sessionLock); + MonitorGuard guard(_sessionLock); SessionList::iterator found = _sessions.find(sessionId); if (found != _sessions.end()) { - _sessions.erase(sessionId); - retval = 0; + if ( ! found->second->isVisitRunning()) { + _sessions.erase(sessionId); + retval = 0; + } } else { retval = 0; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp index 6f0ab998e26..cbcbc68fdff 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp @@ -3,7 +3,6 @@ #include "domain.h" #include <vespa/fnet/frt/supervisor.h> #include <vespa/fastlib/io/bufferedfile.h> -#include <vespa/vespalib/util/closuretask.h> #include <vespa/log/log.h> LOG_SETUP(".transactionlog.session"); @@ -22,22 +21,17 @@ Session::createTask(const Session::SP & session) return Task::UP(new VisitTask(session)); } -void -Session::VisitTask::run() +Session::VisitTask::VisitTask(const Session::SP & session) + : _session(session) { - _session->visitOnly(); + _session->startVisit(); } +Session::VisitTask::~VisitTask() = default; void -Session::SendTask::run() -{ - _session->sendPending(); -} - -bool -Session::inSync() const +Session::VisitTask::run() { - return _inSync; + _session->visitOnly(); } bool @@ -49,7 +43,7 @@ Session::visit(FastOS_FileInterface & file, DomainPart & dp) { } else { more = dp.visit(_range, packet); } - if (packet.getHandle().size() > 0) { + if ( ! packet.getHandle().empty()) { send(packet); } return more; @@ -78,11 +72,17 @@ Session::visit() } void +Session::startVisit() { + assert(!_visitRunning); + _visitRunning = true; +} +void Session::visitOnly() { visit(); sendDone(); finalize(); + _visitRunning = false; } bool Session::finished() const { @@ -90,42 +90,6 @@ bool Session::finished() const { } void -Session::enQ(const SP & session, SerialNum serial, const Packet & packet) -{ - LockGuard guard(session->_lock); - session->_packetQ.push_back(QPacket(serial,packet)); - if (session->_inSync) { - session->_domain->execute(Task::UP(new SendTask(session))); - } -} - -void -Session::sendPending() -{ - for (;;) { - QPacket packet; - { - LockGuard guard(_lock); - if (_packetQ.empty() || !ok()) - break; - packet = std::move(_packetQ.front()); - _packetQ.pop_front(); - } - sendPacket(packet._serial, *packet._packet); - } -} - -void -Session::sendPacket(SerialNum serial, const Packet & packet) -{ - if (_range.from() < serial) { - send(packet); - } else { - LOG(debug, "[%d] : Skipping %" PRIu64 ". Last sent is %" PRIu64, _id, serial, _range.from()); - } -} - -void Session::finalize() { if (!ok()) { @@ -172,21 +136,6 @@ Session::RequestDone(FRT_RPCRequest * req) req->SubRef(); } -int32_t -Session::rpcAsync(FRT_RPCRequest * req) -{ - int32_t retval(-7); - LOG(debug, "rpcAsync %s starting.", req->GetMethodName()); - FRT_Supervisor::InvokeAsync(_supervisor.GetTransport(), _connection, req, NEVER, this); - if (ok()) { - LOG(debug, "rpcAsync %s OK", req->GetMethodName()); - retval = 0; - } else { - LOG(warning, "rpcAsync %s FAILED", req->GetMethodName()); - } - return retval; -} - Session::Session(int sId, const SerialNumRange & r, const Domain::SP & d, FRT_Supervisor & supervisor, FNET_Connection *conn) : _supervisor(supervisor), @@ -194,12 +143,11 @@ Session::Session(int sId, const SerialNumRange & r, const Domain::SP & d, _domain(d), _range(r), _id(sId), - _inSync(false), _ok(true), + _visitRunning(false), + _inSync(false), _finished(false), - _packetQ(), - _startTime(), - _lock() + _startTime() { _connection->AddRef(); } @@ -217,22 +165,18 @@ Session::send(const Packet & packet) req->GetParams()->AddString(_domain->name().c_str()); req->GetParams()->AddInt32(id()); req->GetParams()->AddData(packet.getHandle().c_str(), packet.getHandle().size()); - return send(req, true); + return send(req); } bool -Session::send(FRT_RPCRequest * req, bool wait) +Session::send(FRT_RPCRequest * req) { - int32_t retval(-1); - if (wait) { - retval = rpc(req); - if ( ! ((retval == RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) { - LOG(error, "Return value != OK(%d) in send for method 'visitCallback'.", retval); - } - req->SubRef(); - } else { - retval = rpcAsync(req); + int32_t retval = rpc(req); + if ( ! ((retval == RPC::OK) || (retval == FRTE_RPC_CONNECTION)) ) { + LOG(error, "Return value != OK(%d) in send for method 'visitCallback'.", retval); } + req->SubRef(); + return (retval == RPC::OK); } @@ -243,8 +187,7 @@ Session::sendDone() req->SetMethodName("eofCallback"); req->GetParams()->AddString(_domain->name().c_str()); req->GetParams()->AddInt32(id()); - bool retval(send(req, true)); - LockGuard guard(_lock); + bool retval(send(req)); _inSync = true; return retval; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.h b/searchlib/src/vespa/searchlib/transactionlog/session.h index c42d6839dfa..29038ec5290 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.h +++ b/searchlib/src/vespa/searchlib/transactionlog/session.h @@ -30,61 +30,43 @@ public: ~Session(); const SerialNumRange & range() const { return _range; } int id() const { return _id; } - bool inSync() const; + bool inSync() const { return _inSync; } bool ok() const { return _ok; } bool finished() const; - static void enQ(const SP & session, SerialNum serial, const Packet & packet); static Task::UP createTask(const Session::SP & session); void setStartTime(time_point startTime) { _startTime = startTime; } time_point getStartTime() const { return _startTime; } + bool isVisitRunning() const { return _visitRunning; } private: - struct QPacket { - QPacket() : _serial(0), _packet() {} - QPacket(SerialNum s, const Packet & p) - : _serial(s), - _packet(new Packet(p)) - { } - SerialNum _serial; - std::unique_ptr<Packet> _packet; - }; class VisitTask : public Task { public: - VisitTask(const Session::SP & session) : _session(session) { } + VisitTask(const Session::SP & session); + ~VisitTask(); private: void run() override; Session::SP _session; }; - class SendTask : public Task { - public: - SendTask(const Session::SP & session) : _session(session) { } - void run() override; - private: - Session::SP _session; - }; - bool send(FRT_RPCRequest * req, bool wait); + bool send(FRT_RPCRequest * req); void RequestDone(FRT_RPCRequest *req) override; bool send(const Packet & packet); - void sendPacket(SerialNum serial, const Packet & packet); bool sendDone(); - void sendPending(); void visit(); void visitOnly(); + void startVisit(); void finalize(); bool visit(FastOS_FileInterface & file, DomainPart & dp) __attribute__((noinline)); int32_t rpc(FRT_RPCRequest * req); - int32_t rpcAsync(FRT_RPCRequest * req); FRT_Supervisor & _supervisor; FNET_Connection * _connection; DomainSP _domain; SerialNumRange _range; int _id; - bool _inSync; bool _ok; - bool _finished; - std::deque<QPacket> _packetQ; + std::atomic<bool> _visitRunning; + std::atomic<bool> _inSync; + std::atomic<bool> _finished; time_point _startTime; - vespalib::Lock _lock; }; } |