diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-06-01 15:32:15 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-06-01 18:03:25 +0200 |
commit | 61a4ea34c2ddd3b618249b6f6eb402069e40bccb (patch) | |
tree | 6d6e8d55bfd574a0d2d039a07099774d2545f32e /searchlib | |
parent | 54581b5ecd98230dc90fe787f1f02a33ebffbff6 (diff) |
Removed frt.h from header files to reduce include volume.
Diffstat (limited to 'searchlib')
6 files changed, 68 insertions, 69 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp index 734c7da8055..6e7e5e0d8f5 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp @@ -1,6 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/searchlib/transactionlog/session.h> -#include <vespa/searchlib/transactionlog/domain.h> +#include "session.h" +#include "domain.h" +#include <vespa/fnet/frt/supervisor.h> #include <vespa/fastlib/io/bufferedfile.h> #include <vespa/vespalib/util/closuretask.h> #include <vespa/log/log.h> @@ -95,6 +96,10 @@ Session::visitOnly() finalize(); } +bool Session::finished() const { + return _finished || (_connection->GetState() != FNET_Connection::FNET_CONNECTED); +} + void Session::enQ(const SP & session, SerialNum serial, const Packet & packet) { diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.h b/searchlib/src/vespa/searchlib/transactionlog/session.h index 6616e311aa4..6c49d90d2fd 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.h +++ b/searchlib/src/vespa/searchlib/transactionlog/session.h @@ -2,9 +2,9 @@ #pragma once #include "common.h" -#include <vespa/fnet/frt/frt.h> #include <vespa/vespalib/util/executor.h> #include <vespa/vespalib/util/sync.h> +#include <vespa/fnet/frt/invoker.h> #include <deque> class FastOS_FileInterface; @@ -32,7 +32,7 @@ public: bool inSync() const; bool continous() const { return _subscriber; } bool ok() const { return _ok; } - bool finished() const { return _finished || (_connection->GetState() != FNET_Connection::FNET_CONNECTED);} + bool finished() const; static void enQ(const SP & session, SerialNum serial, const Packet & packet); static Task::UP createTask(const Session::SP & session); private: diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp index df3aa1404ae..98b3e1b46ee 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp @@ -1,11 +1,16 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "translogclient.h" -#include <stdexcept> #include <vespa/vespalib/util/stringfmt.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <thread> #include <vespa/log/log.h> LOG_SETUP(".translogclient"); +using namespace std::chrono_literals; + namespace search { namespace transactionlog { @@ -18,27 +23,31 @@ using vespalib::LockGuard; TransLogClient::TransLogClient(const vespalib::string & rpcTarget) : _rpcTarget(rpcTarget), _sessions(), - _supervisor(), + _supervisor(std::make_unique<FRT_Supervisor>()), _target(NULL) { reconnect(); - exportRPC(_supervisor); - _supervisor.Start(); + exportRPC(*_supervisor); + _supervisor->Start(); } TransLogClient::~TransLogClient() { disconnect(); - _supervisor.ShutDown(true); + _supervisor->ShutDown(true); } bool TransLogClient::reconnect() { disconnect(); - _target = _supervisor.Get2WayTarget(_rpcTarget.c_str()); + _target = _supervisor->Get2WayTarget(_rpcTarget.c_str()); return isConnected(); } +bool TransLogClient::isConnected() const { + return (_target != NULL) && _target->IsValid(); +} + void TransLogClient::disconnect() { if (_target) { @@ -48,7 +57,7 @@ void TransLogClient::disconnect() bool TransLogClient::create(const vespalib::string & domain) { - FRT_RPCRequest *req = _supervisor.AllocRPCRequest(); + FRT_RPCRequest *req = _supervisor->AllocRPCRequest(); req->SetMethodName("createDomain"); req->GetParams()->AddString(domain.c_str()); int32_t retval(rpc(req)); @@ -58,7 +67,7 @@ bool TransLogClient::create(const vespalib::string & domain) bool TransLogClient::remove(const vespalib::string & domain) { - FRT_RPCRequest *req = _supervisor.AllocRPCRequest(); + FRT_RPCRequest *req = _supervisor->AllocRPCRequest(); req->SetMethodName("deleteDomain"); req->GetParams()->AddString(domain.c_str()); int32_t retval(rpc(req)); @@ -69,7 +78,7 @@ bool TransLogClient::remove(const vespalib::string & domain) TransLogClient::Session::UP TransLogClient::open(const vespalib::string & domain) { Session::UP session; - FRT_RPCRequest *req = _supervisor.AllocRPCRequest(); + FRT_RPCRequest *req = _supervisor->AllocRPCRequest(); req->SetMethodName("openDomain"); req->GetParams()->AddString(domain.c_str()); int32_t retval(rpc(req)); @@ -92,7 +101,7 @@ TransLogClient::Visitor::UP TransLogClient::createVisitor(const vespalib::string bool TransLogClient::listDomains(std::vector<vespalib::string> & dir) { - FRT_RPCRequest *req = _supervisor.AllocRPCRequest(); + FRT_RPCRequest *req = _supervisor->AllocRPCRequest(); req->SetMethodName("listDomains"); int32_t retval(rpc(req)); if (retval == 0) { @@ -227,7 +236,7 @@ bool TransLogClient::Session::commit(const vespalib::ConstBufferRef & buf) { bool retval(true); if (buf.size() != 0) { - FRT_RPCRequest *req = _tlc._supervisor.AllocRPCRequest(); + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); req->SetMethodName("domainCommit"); req->GetParams()->AddString(_domain.c_str()); req->GetParams()->AddData(buf.c_str(), buf.size()); @@ -251,7 +260,7 @@ bool TransLogClient::Session::commit(const vespalib::ConstBufferRef & buf) bool TransLogClient::Session::status(SerialNum & b, SerialNum & e, size_t & count) { - FRT_RPCRequest *req = _tlc._supervisor.AllocRPCRequest(); + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); req->SetMethodName("domainStatus"); req->GetParams()->AddString(_domain.c_str()); int32_t retval(_tlc.rpc(req)); @@ -266,7 +275,7 @@ bool TransLogClient::Session::status(SerialNum & b, SerialNum & e, size_t & coun bool TransLogClient::Session::erase(const SerialNum & to) { - FRT_RPCRequest *req = _tlc._supervisor.AllocRPCRequest(); + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); req->SetMethodName("domainPrune"); req->GetParams()->AddString(_domain.c_str()); req->GetParams()->AddInt64(to); @@ -282,7 +291,7 @@ bool TransLogClient::Session::erase(const SerialNum & to) bool TransLogClient::Session::sync(const SerialNum &syncTo, SerialNum &syncedTo) { - FRT_RPCRequest *req = _tlc._supervisor.AllocRPCRequest(); + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); req->SetMethodName("domainSync"); FRT_Values & params = *req->GetParams(); params.AddString(_domain.c_str()); @@ -348,7 +357,7 @@ bool TransLogClient::Session::init(FRT_RPCRequest *req) bool TransLogClient::Visitor::visit(const SerialNum & from, const SerialNum & to) { - FRT_RPCRequest *req = _tlc._supervisor.AllocRPCRequest(); + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); req->SetMethodName("domainVisit"); req->GetParams()->AddString(_domain.c_str()); req->GetParams()->AddInt64(from); @@ -358,7 +367,7 @@ bool TransLogClient::Visitor::visit(const SerialNum & from, const SerialNum & to bool TransLogClient::Subscriber::subscribe(const SerialNum & from) { - FRT_RPCRequest *req = _tlc._supervisor.AllocRPCRequest(); + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); req->SetMethodName("domainSubscribe"); req->GetParams()->AddString(_domain.c_str()); req->GetParams()->AddInt64(from); @@ -367,7 +376,7 @@ bool TransLogClient::Subscriber::subscribe(const SerialNum & from) bool TransLogClient::Session::run() { - FRT_RPCRequest *req = _tlc._supervisor.AllocRPCRequest(); + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); req->SetMethodName("domainSessionRun"); req->GetParams()->AddString(_domain.c_str()); req->GetParams()->AddInt32(_sessionId); @@ -381,12 +390,12 @@ bool TransLogClient::Session::close() int retval(0); if (_sessionId > 0) { do { - FRT_RPCRequest *req = _tlc._supervisor.AllocRPCRequest(); + FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest(); req->SetMethodName("domainSessionClose"); req->GetParams()->AddString(_domain.c_str()); req->GetParams()->AddInt32(_sessionId); if ( (retval = _tlc.rpc(req)) > 0) { - FastOS_Thread::Sleep(10); + std::this_thread::sleep_for(10ms); } req->SubRef(); } while ( retval == 1 ); diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h index 802933b7d81..dbc53cc8608 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h @@ -3,14 +3,16 @@ #include "common.h" #include <vespa/document/util/bytebuffer.h> -#include <vespa/fnet/frt/frt.h> -#include <map> -#include <vector> #include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/buffer.h> +#include <vespa/fnet/frt/invokable.h> +#include <map> +#include <vector> + +class FRT_Supervisor; +class FRT_Target; -namespace search { -namespace transactionlog { +namespace search::transactionlog { class TransLogClient : private FRT_Invokable { @@ -102,7 +104,7 @@ public: Subscriber::UP createSubscriber(const vespalib::string & domain, Session::Callback & callBack); Visitor::UP createVisitor(const vespalib::string & domain, Session::Callback & callBack); - bool isConnected() const { return (_target != NULL) && _target->IsValid(); } + bool isConnected() const; void disconnect(); bool reconnect(); const vespalib::string &getRPCTarget() const { return _rpcTarget; } @@ -131,10 +133,9 @@ private: SessionMap _sessions; //Brute force lock for subscriptions. For multithread safety. vespalib::Lock _lock; - FRT_Supervisor _supervisor; + std::unique_ptr<FRT_Supervisor> _supervisor; FRT_Target * _target; }; } -} diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index 81a52b186de..84f77c76f8c 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -1,12 +1,12 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "translogserver.h" -#include <fstream> #include <vespa/vespalib/util/stringfmt.h> -#include <stdexcept> -#include <vespa/log/log.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/fnet/frt/supervisor.h> +#include <fstream> +#include <vespa/log/log.h> LOG_SETUP(".transactionlog.server"); using vespalib::make_string; @@ -14,14 +14,9 @@ using vespalib::stringref; using vespalib::IllegalArgumentException; using search::common::FileHeaderContext; -namespace search -{ +namespace search::transactionlog { -namespace transactionlog -{ - -namespace -{ +namespace { class SyncHandler : public FNET_Task { @@ -98,7 +93,7 @@ TransLogServer::TransLogServer(const vespalib::string &name, _defaultCrcType(defaultCrcType), _executor(maxThreads, 128*1024), _threadPool(8192, 1), - _supervisor(), + _supervisor(std::make_unique<FRT_Supervisor>()), _domains(), _reqQ(), _fileHeaderContext(fileHeaderContext) @@ -125,13 +120,13 @@ TransLogServer::TransLogServer(const vespalib::string &name, } } } - exportRPC(_supervisor); + exportRPC(*_supervisor); char listenSpec[32]; sprintf(listenSpec, "tcp/%d", listenPort); bool listenOk(false); for (int i(600); !listenOk && i; i--) { - if (_supervisor.Listen(listenSpec)) { - _supervisor.Start(); + if (_supervisor->Listen(listenSpec)) { + _supervisor->Start(); listenOk = true; } else { LOG(warning, "Failed listening at port %s trying for %d seconds more.", listenSpec, i); @@ -154,7 +149,7 @@ TransLogServer::~TransLogServer() { stop(); join(); - _supervisor.ShutDown(true); + _supervisor->ShutDown(true); } bool TransLogServer::onStop() @@ -256,9 +251,9 @@ TransLogServer::findDomain(const stringref &domainName) void TransLogServer::exportRPC(FRT_Supervisor & supervisor) { - _supervisor.SetSessionInitHook(FRT_METHOD(TransLogServer::initSession), this); - _supervisor.SetSessionFiniHook(FRT_METHOD(TransLogServer::finiSession), this); - _supervisor.SetSessionDownHook(FRT_METHOD(TransLogServer::downSession), this); + _supervisor->SetSessionInitHook(FRT_METHOD(TransLogServer::initSession), this); + _supervisor->SetSessionFiniHook(FRT_METHOD(TransLogServer::finiSession), this); + _supervisor->SetSessionDownHook(FRT_METHOD(TransLogServer::downSession), this); FRT_ReflectionBuilder rb( & supervisor); //-- Create Domain ----------------------------------------------------------- @@ -521,7 +516,7 @@ void TransLogServer::domainSubscribe(FRT_RPCRequest *req) if (domain) { SerialNum from(params[1]._intval64); LOG(debug, "domainSubscribe(%s, %" PRIu64 ")", domainName, from); - retval = domain->subscribe(domain, from, _supervisor, req->GetConnection()); + retval = domain->subscribe(domain, from, *_supervisor, req->GetConnection()); } ret.AddInt32(retval); } @@ -538,7 +533,7 @@ void TransLogServer::domainVisit(FRT_RPCRequest *req) SerialNum from(params[1]._intval64); SerialNum to(params[2]._intval64); LOG(debug, "domainVisit(%s, %" PRIu64 ", %" PRIu64 ")", domainName, from, to); - retval = domain->visit(domain, from, to, _supervisor, req->GetConnection()); + retval = domain->visit(domain, from, to, *_supervisor, req->GetConnection()); } ret.AddInt32(retval); } @@ -656,14 +651,9 @@ TransLogServer::domainSync(FRT_RPCRequest *req) return; } - SyncHandler *syncHandler = new SyncHandler(&_supervisor, - req, - domain, - session, - syncTo); + SyncHandler *syncHandler = new SyncHandler(_supervisor.get(), req, domain, session, syncTo); syncHandler->ScheduleNow(); } } -} diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index e37204e9ed1..1ef94bbee02 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -1,23 +1,17 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/searchlib/transactionlog/domain.h> +#include "domain.h" #include <vespa/vespalib/util/document_runnable.h> #include <vespa/document/util/queue.h> +#include <vespa/fnet/frt/invokable.h> #include <mutex> -namespace search -{ - -namespace common -{ +namespace search { -class FileHeaderContext; +namespace common { class FileHeaderContext; } -} - -namespace transactionlog -{ +namespace transactionlog { class TransLogServerExplorer; @@ -96,7 +90,7 @@ private: const DomainPart::Crc _defaultCrcType; vespalib::ThreadStackExecutor _executor; FastOS_ThreadPool _threadPool; - FRT_Supervisor _supervisor; + std::unique_ptr<FRT_Supervisor> _supervisor; DomainList _domains; mutable std::mutex _lock; // Protects _domains std::mutex _fileLock; // Protects the creating and deleting domains including file system operations. |