summaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-06-01 15:32:15 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-06-01 18:03:25 +0200
commit61a4ea34c2ddd3b618249b6f6eb402069e40bccb (patch)
tree6d6e8d55bfd574a0d2d039a07099774d2545f32e /searchlib
parent54581b5ecd98230dc90fe787f1f02a33ebffbff6 (diff)
Removed frt.h from header files to reduce include volume.
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/session.cpp9
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/session.h4
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp47
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogclient.h17
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp42
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.h18
6 files changed, 68 insertions, 69 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.cpp b/searchlib/src/vespa/searchlib/transactionlog/session.cpp
index 734c7da8055..6e7e5e0d8f5 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/session.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/session.cpp
@@ -1,6 +1,7 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/searchlib/transactionlog/session.h>
-#include <vespa/searchlib/transactionlog/domain.h>
+#include "session.h"
+#include "domain.h"
+#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fastlib/io/bufferedfile.h>
#include <vespa/vespalib/util/closuretask.h>
#include <vespa/log/log.h>
@@ -95,6 +96,10 @@ Session::visitOnly()
finalize();
}
+bool Session::finished() const {
+ return _finished || (_connection->GetState() != FNET_Connection::FNET_CONNECTED);
+}
+
void
Session::enQ(const SP & session, SerialNum serial, const Packet & packet)
{
diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.h b/searchlib/src/vespa/searchlib/transactionlog/session.h
index 6616e311aa4..6c49d90d2fd 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/session.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/session.h
@@ -2,9 +2,9 @@
#pragma once
#include "common.h"
-#include <vespa/fnet/frt/frt.h>
#include <vespa/vespalib/util/executor.h>
#include <vespa/vespalib/util/sync.h>
+#include <vespa/fnet/frt/invoker.h>
#include <deque>
class FastOS_FileInterface;
@@ -32,7 +32,7 @@ public:
bool inSync() const;
bool continous() const { return _subscriber; }
bool ok() const { return _ok; }
- bool finished() const { return _finished || (_connection->GetState() != FNET_Connection::FNET_CONNECTED);}
+ bool finished() const;
static void enQ(const SP & session, SerialNum serial, const Packet & packet);
static Task::UP createTask(const Session::SP & session);
private:
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
index df3aa1404ae..98b3e1b46ee 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
@@ -1,11 +1,16 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "translogclient.h"
-#include <stdexcept>
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <thread>
#include <vespa/log/log.h>
LOG_SETUP(".translogclient");
+using namespace std::chrono_literals;
+
namespace search {
namespace transactionlog {
@@ -18,27 +23,31 @@ using vespalib::LockGuard;
TransLogClient::TransLogClient(const vespalib::string & rpcTarget) :
_rpcTarget(rpcTarget),
_sessions(),
- _supervisor(),
+ _supervisor(std::make_unique<FRT_Supervisor>()),
_target(NULL)
{
reconnect();
- exportRPC(_supervisor);
- _supervisor.Start();
+ exportRPC(*_supervisor);
+ _supervisor->Start();
}
TransLogClient::~TransLogClient()
{
disconnect();
- _supervisor.ShutDown(true);
+ _supervisor->ShutDown(true);
}
bool TransLogClient::reconnect()
{
disconnect();
- _target = _supervisor.Get2WayTarget(_rpcTarget.c_str());
+ _target = _supervisor->Get2WayTarget(_rpcTarget.c_str());
return isConnected();
}
+bool TransLogClient::isConnected() const {
+ return (_target != NULL) && _target->IsValid();
+}
+
void TransLogClient::disconnect()
{
if (_target) {
@@ -48,7 +57,7 @@ void TransLogClient::disconnect()
bool TransLogClient::create(const vespalib::string & domain)
{
- FRT_RPCRequest *req = _supervisor.AllocRPCRequest();
+ FRT_RPCRequest *req = _supervisor->AllocRPCRequest();
req->SetMethodName("createDomain");
req->GetParams()->AddString(domain.c_str());
int32_t retval(rpc(req));
@@ -58,7 +67,7 @@ bool TransLogClient::create(const vespalib::string & domain)
bool TransLogClient::remove(const vespalib::string & domain)
{
- FRT_RPCRequest *req = _supervisor.AllocRPCRequest();
+ FRT_RPCRequest *req = _supervisor->AllocRPCRequest();
req->SetMethodName("deleteDomain");
req->GetParams()->AddString(domain.c_str());
int32_t retval(rpc(req));
@@ -69,7 +78,7 @@ bool TransLogClient::remove(const vespalib::string & domain)
TransLogClient::Session::UP TransLogClient::open(const vespalib::string & domain)
{
Session::UP session;
- FRT_RPCRequest *req = _supervisor.AllocRPCRequest();
+ FRT_RPCRequest *req = _supervisor->AllocRPCRequest();
req->SetMethodName("openDomain");
req->GetParams()->AddString(domain.c_str());
int32_t retval(rpc(req));
@@ -92,7 +101,7 @@ TransLogClient::Visitor::UP TransLogClient::createVisitor(const vespalib::string
bool TransLogClient::listDomains(std::vector<vespalib::string> & dir)
{
- FRT_RPCRequest *req = _supervisor.AllocRPCRequest();
+ FRT_RPCRequest *req = _supervisor->AllocRPCRequest();
req->SetMethodName("listDomains");
int32_t retval(rpc(req));
if (retval == 0) {
@@ -227,7 +236,7 @@ bool TransLogClient::Session::commit(const vespalib::ConstBufferRef & buf)
{
bool retval(true);
if (buf.size() != 0) {
- FRT_RPCRequest *req = _tlc._supervisor.AllocRPCRequest();
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
req->SetMethodName("domainCommit");
req->GetParams()->AddString(_domain.c_str());
req->GetParams()->AddData(buf.c_str(), buf.size());
@@ -251,7 +260,7 @@ bool TransLogClient::Session::commit(const vespalib::ConstBufferRef & buf)
bool TransLogClient::Session::status(SerialNum & b, SerialNum & e, size_t & count)
{
- FRT_RPCRequest *req = _tlc._supervisor.AllocRPCRequest();
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
req->SetMethodName("domainStatus");
req->GetParams()->AddString(_domain.c_str());
int32_t retval(_tlc.rpc(req));
@@ -266,7 +275,7 @@ bool TransLogClient::Session::status(SerialNum & b, SerialNum & e, size_t & coun
bool TransLogClient::Session::erase(const SerialNum & to)
{
- FRT_RPCRequest *req = _tlc._supervisor.AllocRPCRequest();
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
req->SetMethodName("domainPrune");
req->GetParams()->AddString(_domain.c_str());
req->GetParams()->AddInt64(to);
@@ -282,7 +291,7 @@ bool TransLogClient::Session::erase(const SerialNum & to)
bool
TransLogClient::Session::sync(const SerialNum &syncTo, SerialNum &syncedTo)
{
- FRT_RPCRequest *req = _tlc._supervisor.AllocRPCRequest();
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
req->SetMethodName("domainSync");
FRT_Values & params = *req->GetParams();
params.AddString(_domain.c_str());
@@ -348,7 +357,7 @@ bool TransLogClient::Session::init(FRT_RPCRequest *req)
bool TransLogClient::Visitor::visit(const SerialNum & from, const SerialNum & to)
{
- FRT_RPCRequest *req = _tlc._supervisor.AllocRPCRequest();
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
req->SetMethodName("domainVisit");
req->GetParams()->AddString(_domain.c_str());
req->GetParams()->AddInt64(from);
@@ -358,7 +367,7 @@ bool TransLogClient::Visitor::visit(const SerialNum & from, const SerialNum & to
bool TransLogClient::Subscriber::subscribe(const SerialNum & from)
{
- FRT_RPCRequest *req = _tlc._supervisor.AllocRPCRequest();
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
req->SetMethodName("domainSubscribe");
req->GetParams()->AddString(_domain.c_str());
req->GetParams()->AddInt64(from);
@@ -367,7 +376,7 @@ bool TransLogClient::Subscriber::subscribe(const SerialNum & from)
bool TransLogClient::Session::run()
{
- FRT_RPCRequest *req = _tlc._supervisor.AllocRPCRequest();
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
req->SetMethodName("domainSessionRun");
req->GetParams()->AddString(_domain.c_str());
req->GetParams()->AddInt32(_sessionId);
@@ -381,12 +390,12 @@ bool TransLogClient::Session::close()
int retval(0);
if (_sessionId > 0) {
do {
- FRT_RPCRequest *req = _tlc._supervisor.AllocRPCRequest();
+ FRT_RPCRequest *req = _tlc._supervisor->AllocRPCRequest();
req->SetMethodName("domainSessionClose");
req->GetParams()->AddString(_domain.c_str());
req->GetParams()->AddInt32(_sessionId);
if ( (retval = _tlc.rpc(req)) > 0) {
- FastOS_Thread::Sleep(10);
+ std::this_thread::sleep_for(10ms);
}
req->SubRef();
} while ( retval == 1 );
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h
index 802933b7d81..dbc53cc8608 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h
@@ -3,14 +3,16 @@
#include "common.h"
#include <vespa/document/util/bytebuffer.h>
-#include <vespa/fnet/frt/frt.h>
-#include <map>
-#include <vector>
#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/util/buffer.h>
+#include <vespa/fnet/frt/invokable.h>
+#include <map>
+#include <vector>
+
+class FRT_Supervisor;
+class FRT_Target;
-namespace search {
-namespace transactionlog {
+namespace search::transactionlog {
class TransLogClient : private FRT_Invokable
{
@@ -102,7 +104,7 @@ public:
Subscriber::UP createSubscriber(const vespalib::string & domain, Session::Callback & callBack);
Visitor::UP createVisitor(const vespalib::string & domain, Session::Callback & callBack);
- bool isConnected() const { return (_target != NULL) && _target->IsValid(); }
+ bool isConnected() const;
void disconnect();
bool reconnect();
const vespalib::string &getRPCTarget() const { return _rpcTarget; }
@@ -131,10 +133,9 @@ private:
SessionMap _sessions;
//Brute force lock for subscriptions. For multithread safety.
vespalib::Lock _lock;
- FRT_Supervisor _supervisor;
+ std::unique_ptr<FRT_Supervisor> _supervisor;
FRT_Target * _target;
};
}
-}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index 81a52b186de..84f77c76f8c 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -1,12 +1,12 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "translogserver.h"
-#include <fstream>
#include <vespa/vespalib/util/stringfmt.h>
-#include <stdexcept>
-#include <vespa/log/log.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <fstream>
+#include <vespa/log/log.h>
LOG_SETUP(".transactionlog.server");
using vespalib::make_string;
@@ -14,14 +14,9 @@ using vespalib::stringref;
using vespalib::IllegalArgumentException;
using search::common::FileHeaderContext;
-namespace search
-{
+namespace search::transactionlog {
-namespace transactionlog
-{
-
-namespace
-{
+namespace {
class SyncHandler : public FNET_Task
{
@@ -98,7 +93,7 @@ TransLogServer::TransLogServer(const vespalib::string &name,
_defaultCrcType(defaultCrcType),
_executor(maxThreads, 128*1024),
_threadPool(8192, 1),
- _supervisor(),
+ _supervisor(std::make_unique<FRT_Supervisor>()),
_domains(),
_reqQ(),
_fileHeaderContext(fileHeaderContext)
@@ -125,13 +120,13 @@ TransLogServer::TransLogServer(const vespalib::string &name,
}
}
}
- exportRPC(_supervisor);
+ exportRPC(*_supervisor);
char listenSpec[32];
sprintf(listenSpec, "tcp/%d", listenPort);
bool listenOk(false);
for (int i(600); !listenOk && i; i--) {
- if (_supervisor.Listen(listenSpec)) {
- _supervisor.Start();
+ if (_supervisor->Listen(listenSpec)) {
+ _supervisor->Start();
listenOk = true;
} else {
LOG(warning, "Failed listening at port %s trying for %d seconds more.", listenSpec, i);
@@ -154,7 +149,7 @@ TransLogServer::~TransLogServer()
{
stop();
join();
- _supervisor.ShutDown(true);
+ _supervisor->ShutDown(true);
}
bool TransLogServer::onStop()
@@ -256,9 +251,9 @@ TransLogServer::findDomain(const stringref &domainName)
void TransLogServer::exportRPC(FRT_Supervisor & supervisor)
{
- _supervisor.SetSessionInitHook(FRT_METHOD(TransLogServer::initSession), this);
- _supervisor.SetSessionFiniHook(FRT_METHOD(TransLogServer::finiSession), this);
- _supervisor.SetSessionDownHook(FRT_METHOD(TransLogServer::downSession), this);
+ _supervisor->SetSessionInitHook(FRT_METHOD(TransLogServer::initSession), this);
+ _supervisor->SetSessionFiniHook(FRT_METHOD(TransLogServer::finiSession), this);
+ _supervisor->SetSessionDownHook(FRT_METHOD(TransLogServer::downSession), this);
FRT_ReflectionBuilder rb( & supervisor);
//-- Create Domain -----------------------------------------------------------
@@ -521,7 +516,7 @@ void TransLogServer::domainSubscribe(FRT_RPCRequest *req)
if (domain) {
SerialNum from(params[1]._intval64);
LOG(debug, "domainSubscribe(%s, %" PRIu64 ")", domainName, from);
- retval = domain->subscribe(domain, from, _supervisor, req->GetConnection());
+ retval = domain->subscribe(domain, from, *_supervisor, req->GetConnection());
}
ret.AddInt32(retval);
}
@@ -538,7 +533,7 @@ void TransLogServer::domainVisit(FRT_RPCRequest *req)
SerialNum from(params[1]._intval64);
SerialNum to(params[2]._intval64);
LOG(debug, "domainVisit(%s, %" PRIu64 ", %" PRIu64 ")", domainName, from, to);
- retval = domain->visit(domain, from, to, _supervisor, req->GetConnection());
+ retval = domain->visit(domain, from, to, *_supervisor, req->GetConnection());
}
ret.AddInt32(retval);
}
@@ -656,14 +651,9 @@ TransLogServer::domainSync(FRT_RPCRequest *req)
return;
}
- SyncHandler *syncHandler = new SyncHandler(&_supervisor,
- req,
- domain,
- session,
- syncTo);
+ SyncHandler *syncHandler = new SyncHandler(_supervisor.get(), req, domain, session, syncTo);
syncHandler->ScheduleNow();
}
}
-}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
index e37204e9ed1..1ef94bbee02 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
@@ -1,23 +1,17 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vespa/searchlib/transactionlog/domain.h>
+#include "domain.h"
#include <vespa/vespalib/util/document_runnable.h>
#include <vespa/document/util/queue.h>
+#include <vespa/fnet/frt/invokable.h>
#include <mutex>
-namespace search
-{
-
-namespace common
-{
+namespace search {
-class FileHeaderContext;
+namespace common { class FileHeaderContext; }
-}
-
-namespace transactionlog
-{
+namespace transactionlog {
class TransLogServerExplorer;
@@ -96,7 +90,7 @@ private:
const DomainPart::Crc _defaultCrcType;
vespalib::ThreadStackExecutor _executor;
FastOS_ThreadPool _threadPool;
- FRT_Supervisor _supervisor;
+ std::unique_ptr<FRT_Supervisor> _supervisor;
DomainList _domains;
mutable std::mutex _lock; // Protects _domains
std::mutex _fileLock; // Protects the creating and deleting domains including file system operations.