summaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@oath.com>2018-09-04 12:16:55 +0000
committerHåvard Pettersen <havardpe@oath.com>2018-09-05 13:25:25 +0000
commitf163bfa304b627aa0fda87b13bf4d22f5bf21fbf (patch)
tree9b1d694b790ac407417ea5dfa0f0ddbb0ae500b7 /searchlib
parent3bcb5407524034b3206aef269fef52a196023504 (diff)
remove non-instant invocation
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp45
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogclient.h8
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp22
3 files changed, 58 insertions, 17 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
index aa2b558ea0c..767c8b45e10 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp
@@ -11,15 +11,40 @@ LOG_SETUP(".translogclient");
using namespace std::chrono_literals;
+VESPA_THREAD_STACK_TAG(translogclient_rpc_callback)
+
namespace search::transactionlog {
namespace {
const double NEVER(-1.0);
}
+namespace {
+
+struct RpcTask : public vespalib::Executor::Task {
+ FRT_RPCRequest *req;
+ std::function<void(FRT_RPCRequest *req)> fun;
+ RpcTask(FRT_RPCRequest *req_in, std::function<void(FRT_RPCRequest *req)> &&fun_in)
+ : req(req_in), fun(std::move(fun_in)) {}
+ void run() override {
+ fun(req);
+ req->Return();
+ req = nullptr;
+ }
+ ~RpcTask() {
+ if (req != nullptr) {
+ req->SetError(FRTE_RPC_METHOD_FAILED, "client has been shut down");
+ req->Return();
+ }
+ }
+};
+
+}
+
using vespalib::LockGuard;
TransLogClient::TransLogClient(const vespalib::string & rpcTarget) :
+ _executor(1, 128 * 1024, translogclient_rpc_callback),
_rpcTarget(rpcTarget),
_sessions(),
_supervisor(std::make_unique<FRT_Supervisor>()),
@@ -33,6 +58,7 @@ TransLogClient::TransLogClient(const vespalib::string & rpcTarget) :
TransLogClient::~TransLogClient()
{
disconnect();
+ _executor.shutdown().sync();
_supervisor->ShutDown(true);
}
@@ -139,7 +165,7 @@ void TransLogClient::exportRPC(FRT_Supervisor & supervisor)
FRT_ReflectionBuilder rb( & supervisor);
//-- Visit Callbacks -----------------------------------------------------------
- rb.DefineMethod("visitCallback", "six", "i", false, FRT_METHOD(TransLogClient::visitCallbackRPC), this);
+ rb.DefineMethod("visitCallback", "six", "i", FRT_METHOD(TransLogClient::visitCallbackRPC_hook), this);
rb.MethodDesc("Will return data asked from a subscriber/visitor.");
rb.ParamDesc("name", "The name of the domain.");
rb.ParamDesc("session", "Session handle.");
@@ -147,14 +173,15 @@ void TransLogClient::exportRPC(FRT_Supervisor & supervisor)
rb.ReturnDesc("result", "A resultcode(int) of the operation. Non zero number indicates error.");
//-- Visit Callbacks -----------------------------------------------------------
- rb.DefineMethod("eofCallback", "si", "i", false, FRT_METHOD(TransLogClient::eofCallbackRPC), this);
+ rb.DefineMethod("eofCallback", "si", "i", FRT_METHOD(TransLogClient::eofCallbackRPC_hook), this);
rb.MethodDesc("Will tell you that you are done with the visitor.");
rb.ParamDesc("name", "The name of the domain.");
rb.ParamDesc("session", "Session handle.");
rb.ReturnDesc("result", "A resultcode(int) of the operation. Non zero number indicates error.");
}
-void TransLogClient::visitCallbackRPC(FRT_RPCRequest *req)
+
+void TransLogClient::do_visitCallbackRPC(FRT_RPCRequest *req)
{
uint32_t retval(uint32_t(-1));
FRT_Values & params = *req->GetParams();
@@ -171,7 +198,7 @@ void TransLogClient::visitCallbackRPC(FRT_RPCRequest *req)
LOG(debug, "visitCallback(%s, %d)=%d done", domainName, sessionId, retval);
}
-void TransLogClient::eofCallbackRPC(FRT_RPCRequest *req)
+void TransLogClient::do_eofCallbackRPC(FRT_RPCRequest *req)
{
uint32_t retval(uint32_t(-1));
FRT_Values & params = *req->GetParams();
@@ -188,6 +215,16 @@ void TransLogClient::eofCallbackRPC(FRT_RPCRequest *req)
LOG(debug, "eofCallback(%s, %d)=%d done", domainName, sessionId, retval);
}
+void TransLogClient::visitCallbackRPC_hook(FRT_RPCRequest *req)
+{
+ _executor.execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_visitCallbackRPC(x); }));
+}
+
+void TransLogClient::eofCallbackRPC_hook(FRT_RPCRequest *req)
+{
+ _executor.execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_eofCallbackRPC(x); }));
+}
+
TransLogClient::Session::Session(const vespalib::string & domain, TransLogClient & tlc) :
_tlc(tlc),
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h
index 87901890673..267d6e3b0ed 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h
@@ -5,6 +5,7 @@
#include <vespa/document/util/bytebuffer.h>
#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/util/buffer.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/fnet/frt/invokable.h>
#include <map>
#include <vector>
@@ -96,8 +97,10 @@ public:
const vespalib::string &getRPCTarget() const { return _rpcTarget; }
private:
void exportRPC(FRT_Supervisor & supervisor);
- void visitCallbackRPC(FRT_RPCRequest *req);
- void eofCallbackRPC(FRT_RPCRequest *req);
+ void do_visitCallbackRPC(FRT_RPCRequest *req);
+ void do_eofCallbackRPC(FRT_RPCRequest *req);
+ void visitCallbackRPC_hook(FRT_RPCRequest *req);
+ void eofCallbackRPC_hook(FRT_RPCRequest *req);
int32_t rpc(FRT_RPCRequest * req);
Session * findSession(const vespalib::string & domain, int sessionId);
@@ -114,6 +117,7 @@ private:
typedef std::map< SessionKey, Session * > SessionMap;
+ vespalib::ThreadStackExecutor _executor;
vespalib::string _rpcTarget;
SessionMap _sessions;
//Brute force lock for subscriptions. For multithread safety.
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index 65bb682a389..dd6b63f9241 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -244,32 +244,32 @@ void TransLogServer::exportRPC(FRT_Supervisor & supervisor)
FRT_ReflectionBuilder rb( & supervisor);
//-- Create Domain -----------------------------------------------------------
- rb.DefineMethod("createDomain", "s", "i", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this);
+ rb.DefineMethod("createDomain", "s", "i", FRT_METHOD(TransLogServer::relayToThreadRPC), this);
rb.MethodDesc("Create a new domain.");
rb.ParamDesc("name", "The name of the domain.");
rb.ReturnDesc("handle", "A handle(int) to the domain. Negative number indicates error.");
//-- Delete Domain -----------------------------------------------------------
- rb.DefineMethod("deleteDomain", "s", "is", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this);
+ rb.DefineMethod("deleteDomain", "s", "is", FRT_METHOD(TransLogServer::relayToThreadRPC), this);
rb.MethodDesc("Create a new domain.");
rb.ParamDesc("name", "The name of the domain.");
rb.ReturnDesc("retval", "0 on success. Negative number indicates error.");
rb.ReturnDesc("errormsg", "Message describing the error, if any.");
//-- Open Domain -----------------------------------------------------------
- rb.DefineMethod("openDomain", "s", "i", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this);
+ rb.DefineMethod("openDomain", "s", "i", FRT_METHOD(TransLogServer::relayToThreadRPC), this);
rb.MethodDesc("Open an existing domain.");
rb.ParamDesc("name", "The name of the domain.");
rb.ReturnDesc("handle", "A handle(int) to the domain. Negative number indicates error.");
//-- List Domains -----------------------------------------------------------
- rb.DefineMethod("listDomains", "", "is", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this);
+ rb.DefineMethod("listDomains", "", "is", FRT_METHOD(TransLogServer::relayToThreadRPC), this);
rb.MethodDesc("Will return a list of all the domains.");
rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error.");
rb.ReturnDesc("domains", "List of all the domains in a newline separated string");
//-- Domain Status -----------------------------------------------------------
- rb.DefineMethod("domainStatus", "s", "illl", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this);
+ rb.DefineMethod("domainStatus", "s", "illl", FRT_METHOD(TransLogServer::relayToThreadRPC), this);
rb.MethodDesc("This will return key status information about the domain.");
rb.ParamDesc("name", "The name of the domain.");
rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error.");
@@ -278,7 +278,7 @@ void TransLogServer::exportRPC(FRT_Supervisor & supervisor)
rb.ReturnDesc("size", "Number of elements in the log.");
//-- Domain Commit -----------------------------------------------------------
- rb.DefineMethod("domainCommit", "sx", "is", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this);
+ rb.DefineMethod("domainCommit", "sx", "is", FRT_METHOD(TransLogServer::relayToThreadRPC), this);
rb.MethodDesc("Will commit the data to the log.");
rb.ParamDesc("name", "The name of the domain.");
rb.ParamDesc("packet", "The data to commit to the domain.");
@@ -286,14 +286,14 @@ void TransLogServer::exportRPC(FRT_Supervisor & supervisor)
rb.ReturnDesc("message", "A textual description of the result code.");
//-- Domain Prune -----------------------------------------------------------
- rb.DefineMethod("domainPrune", "sl", "i", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this);
+ rb.DefineMethod("domainPrune", "sl", "i", FRT_METHOD(TransLogServer::relayToThreadRPC), this);
rb.MethodDesc("Will erase all operations prior to the serial number.");
rb.ParamDesc("name", "The name of the domain.");
rb.ParamDesc("to", "Will erase all up and including.");
rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error.");
//-- Domain Visit -----------------------------------------------------------
- rb.DefineMethod("domainVisit", "sll", "i", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this);
+ rb.DefineMethod("domainVisit", "sll", "i", FRT_METHOD(TransLogServer::relayToThreadRPC), this);
rb.MethodDesc("This will create a visitor that return all operations in the range.");
rb.ParamDesc("name", "The name of the domain.");
rb.ParamDesc("from", "Will return all entries following(not including) <from>.");
@@ -301,21 +301,21 @@ void TransLogServer::exportRPC(FRT_Supervisor & supervisor)
rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error. Positive number is the sessionid");
//-- Domain Session Run -----------------------------------------------------------
- rb.DefineMethod("domainSessionRun", "si", "i", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this);
+ rb.DefineMethod("domainSessionRun", "si", "i", FRT_METHOD(TransLogServer::relayToThreadRPC), this);
rb.MethodDesc("This will start the session thread.");
rb.ParamDesc("name", "The name of the domain.");
rb.ParamDesc("sessionid", "The session identifier.");
rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error.");
//-- Domain Session Close -----------------------------------------------------------
- rb.DefineMethod("domainSessionClose", "si", "i", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this);
+ rb.DefineMethod("domainSessionClose", "si", "i", FRT_METHOD(TransLogServer::relayToThreadRPC), this);
rb.MethodDesc("This will close the session.");
rb.ParamDesc("name", "The name of the domain.");
rb.ParamDesc("sessionid", "The session identifier.");
rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error. 1 means busy -> retry. 0 is OK.");
//-- Domain Sync --
- rb.DefineMethod("domainSync", "sl", "il", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this);
+ rb.DefineMethod("domainSync", "sl", "il", FRT_METHOD(TransLogServer::relayToThreadRPC), this);
rb.MethodDesc("Sync domain to given entry");
rb.ParamDesc("name", "The name of the domain.");
rb.ParamDesc("syncto", "Entry to sync to");