diff options
author | Håvard Pettersen <havardpe@oath.com> | 2018-09-04 12:16:55 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@oath.com> | 2018-09-05 13:25:25 +0000 |
commit | f163bfa304b627aa0fda87b13bf4d22f5bf21fbf (patch) | |
tree | 9b1d694b790ac407417ea5dfa0f0ddbb0ae500b7 /searchlib | |
parent | 3bcb5407524034b3206aef269fef52a196023504 (diff) |
remove non-instant invocation
Diffstat (limited to 'searchlib')
3 files changed, 58 insertions, 17 deletions
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp index aa2b558ea0c..767c8b45e10 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.cpp @@ -11,15 +11,40 @@ LOG_SETUP(".translogclient"); using namespace std::chrono_literals; +VESPA_THREAD_STACK_TAG(translogclient_rpc_callback) + namespace search::transactionlog { namespace { const double NEVER(-1.0); } +namespace { + +struct RpcTask : public vespalib::Executor::Task { + FRT_RPCRequest *req; + std::function<void(FRT_RPCRequest *req)> fun; + RpcTask(FRT_RPCRequest *req_in, std::function<void(FRT_RPCRequest *req)> &&fun_in) + : req(req_in), fun(std::move(fun_in)) {} + void run() override { + fun(req); + req->Return(); + req = nullptr; + } + ~RpcTask() { + if (req != nullptr) { + req->SetError(FRTE_RPC_METHOD_FAILED, "client has been shut down"); + req->Return(); + } + } +}; + +} + using vespalib::LockGuard; TransLogClient::TransLogClient(const vespalib::string & rpcTarget) : + _executor(1, 128 * 1024, translogclient_rpc_callback), _rpcTarget(rpcTarget), _sessions(), _supervisor(std::make_unique<FRT_Supervisor>()), @@ -33,6 +58,7 @@ TransLogClient::TransLogClient(const vespalib::string & rpcTarget) : TransLogClient::~TransLogClient() { disconnect(); + _executor.shutdown().sync(); _supervisor->ShutDown(true); } @@ -139,7 +165,7 @@ void TransLogClient::exportRPC(FRT_Supervisor & supervisor) FRT_ReflectionBuilder rb( & supervisor); //-- Visit Callbacks ----------------------------------------------------------- - rb.DefineMethod("visitCallback", "six", "i", false, FRT_METHOD(TransLogClient::visitCallbackRPC), this); + rb.DefineMethod("visitCallback", "six", "i", FRT_METHOD(TransLogClient::visitCallbackRPC_hook), this); rb.MethodDesc("Will return data asked from a subscriber/visitor."); rb.ParamDesc("name", "The name of the domain."); rb.ParamDesc("session", "Session handle."); @@ -147,14 +173,15 @@ void TransLogClient::exportRPC(FRT_Supervisor & supervisor) rb.ReturnDesc("result", "A resultcode(int) of the operation. Non zero number indicates error."); //-- Visit Callbacks ----------------------------------------------------------- - rb.DefineMethod("eofCallback", "si", "i", false, FRT_METHOD(TransLogClient::eofCallbackRPC), this); + rb.DefineMethod("eofCallback", "si", "i", FRT_METHOD(TransLogClient::eofCallbackRPC_hook), this); rb.MethodDesc("Will tell you that you are done with the visitor."); rb.ParamDesc("name", "The name of the domain."); rb.ParamDesc("session", "Session handle."); rb.ReturnDesc("result", "A resultcode(int) of the operation. Non zero number indicates error."); } -void TransLogClient::visitCallbackRPC(FRT_RPCRequest *req) + +void TransLogClient::do_visitCallbackRPC(FRT_RPCRequest *req) { uint32_t retval(uint32_t(-1)); FRT_Values & params = *req->GetParams(); @@ -171,7 +198,7 @@ void TransLogClient::visitCallbackRPC(FRT_RPCRequest *req) LOG(debug, "visitCallback(%s, %d)=%d done", domainName, sessionId, retval); } -void TransLogClient::eofCallbackRPC(FRT_RPCRequest *req) +void TransLogClient::do_eofCallbackRPC(FRT_RPCRequest *req) { uint32_t retval(uint32_t(-1)); FRT_Values & params = *req->GetParams(); @@ -188,6 +215,16 @@ void TransLogClient::eofCallbackRPC(FRT_RPCRequest *req) LOG(debug, "eofCallback(%s, %d)=%d done", domainName, sessionId, retval); } +void TransLogClient::visitCallbackRPC_hook(FRT_RPCRequest *req) +{ + _executor.execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_visitCallbackRPC(x); })); +} + +void TransLogClient::eofCallbackRPC_hook(FRT_RPCRequest *req) +{ + _executor.execute(std::make_unique<RpcTask>(req->Detach(), [this](FRT_RPCRequest *x){ do_eofCallbackRPC(x); })); +} + TransLogClient::Session::Session(const vespalib::string & domain, TransLogClient & tlc) : _tlc(tlc), diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h index 87901890673..267d6e3b0ed 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h @@ -5,6 +5,7 @@ #include <vespa/document/util/bytebuffer.h> #include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/buffer.h> +#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/fnet/frt/invokable.h> #include <map> #include <vector> @@ -96,8 +97,10 @@ public: const vespalib::string &getRPCTarget() const { return _rpcTarget; } private: void exportRPC(FRT_Supervisor & supervisor); - void visitCallbackRPC(FRT_RPCRequest *req); - void eofCallbackRPC(FRT_RPCRequest *req); + void do_visitCallbackRPC(FRT_RPCRequest *req); + void do_eofCallbackRPC(FRT_RPCRequest *req); + void visitCallbackRPC_hook(FRT_RPCRequest *req); + void eofCallbackRPC_hook(FRT_RPCRequest *req); int32_t rpc(FRT_RPCRequest * req); Session * findSession(const vespalib::string & domain, int sessionId); @@ -114,6 +117,7 @@ private: typedef std::map< SessionKey, Session * > SessionMap; + vespalib::ThreadStackExecutor _executor; vespalib::string _rpcTarget; SessionMap _sessions; //Brute force lock for subscriptions. For multithread safety. diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index 65bb682a389..dd6b63f9241 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -244,32 +244,32 @@ void TransLogServer::exportRPC(FRT_Supervisor & supervisor) FRT_ReflectionBuilder rb( & supervisor); //-- Create Domain ----------------------------------------------------------- - rb.DefineMethod("createDomain", "s", "i", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this); + rb.DefineMethod("createDomain", "s", "i", FRT_METHOD(TransLogServer::relayToThreadRPC), this); rb.MethodDesc("Create a new domain."); rb.ParamDesc("name", "The name of the domain."); rb.ReturnDesc("handle", "A handle(int) to the domain. Negative number indicates error."); //-- Delete Domain ----------------------------------------------------------- - rb.DefineMethod("deleteDomain", "s", "is", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this); + rb.DefineMethod("deleteDomain", "s", "is", FRT_METHOD(TransLogServer::relayToThreadRPC), this); rb.MethodDesc("Create a new domain."); rb.ParamDesc("name", "The name of the domain."); rb.ReturnDesc("retval", "0 on success. Negative number indicates error."); rb.ReturnDesc("errormsg", "Message describing the error, if any."); //-- Open Domain ----------------------------------------------------------- - rb.DefineMethod("openDomain", "s", "i", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this); + rb.DefineMethod("openDomain", "s", "i", FRT_METHOD(TransLogServer::relayToThreadRPC), this); rb.MethodDesc("Open an existing domain."); rb.ParamDesc("name", "The name of the domain."); rb.ReturnDesc("handle", "A handle(int) to the domain. Negative number indicates error."); //-- List Domains ----------------------------------------------------------- - rb.DefineMethod("listDomains", "", "is", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this); + rb.DefineMethod("listDomains", "", "is", FRT_METHOD(TransLogServer::relayToThreadRPC), this); rb.MethodDesc("Will return a list of all the domains."); rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error."); rb.ReturnDesc("domains", "List of all the domains in a newline separated string"); //-- Domain Status ----------------------------------------------------------- - rb.DefineMethod("domainStatus", "s", "illl", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this); + rb.DefineMethod("domainStatus", "s", "illl", FRT_METHOD(TransLogServer::relayToThreadRPC), this); rb.MethodDesc("This will return key status information about the domain."); rb.ParamDesc("name", "The name of the domain."); rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error."); @@ -278,7 +278,7 @@ void TransLogServer::exportRPC(FRT_Supervisor & supervisor) rb.ReturnDesc("size", "Number of elements in the log."); //-- Domain Commit ----------------------------------------------------------- - rb.DefineMethod("domainCommit", "sx", "is", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this); + rb.DefineMethod("domainCommit", "sx", "is", FRT_METHOD(TransLogServer::relayToThreadRPC), this); rb.MethodDesc("Will commit the data to the log."); rb.ParamDesc("name", "The name of the domain."); rb.ParamDesc("packet", "The data to commit to the domain."); @@ -286,14 +286,14 @@ void TransLogServer::exportRPC(FRT_Supervisor & supervisor) rb.ReturnDesc("message", "A textual description of the result code."); //-- Domain Prune ----------------------------------------------------------- - rb.DefineMethod("domainPrune", "sl", "i", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this); + rb.DefineMethod("domainPrune", "sl", "i", FRT_METHOD(TransLogServer::relayToThreadRPC), this); rb.MethodDesc("Will erase all operations prior to the serial number."); rb.ParamDesc("name", "The name of the domain."); rb.ParamDesc("to", "Will erase all up and including."); rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error."); //-- Domain Visit ----------------------------------------------------------- - rb.DefineMethod("domainVisit", "sll", "i", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this); + rb.DefineMethod("domainVisit", "sll", "i", FRT_METHOD(TransLogServer::relayToThreadRPC), this); rb.MethodDesc("This will create a visitor that return all operations in the range."); rb.ParamDesc("name", "The name of the domain."); rb.ParamDesc("from", "Will return all entries following(not including) <from>."); @@ -301,21 +301,21 @@ void TransLogServer::exportRPC(FRT_Supervisor & supervisor) rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error. Positive number is the sessionid"); //-- Domain Session Run ----------------------------------------------------------- - rb.DefineMethod("domainSessionRun", "si", "i", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this); + rb.DefineMethod("domainSessionRun", "si", "i", FRT_METHOD(TransLogServer::relayToThreadRPC), this); rb.MethodDesc("This will start the session thread."); rb.ParamDesc("name", "The name of the domain."); rb.ParamDesc("sessionid", "The session identifier."); rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error."); //-- Domain Session Close ----------------------------------------------------------- - rb.DefineMethod("domainSessionClose", "si", "i", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this); + rb.DefineMethod("domainSessionClose", "si", "i", FRT_METHOD(TransLogServer::relayToThreadRPC), this); rb.MethodDesc("This will close the session."); rb.ParamDesc("name", "The name of the domain."); rb.ParamDesc("sessionid", "The session identifier."); rb.ReturnDesc("result", "A resultcode(int) of the operation. Negative number indicates error. 1 means busy -> retry. 0 is OK."); //-- Domain Sync -- - rb.DefineMethod("domainSync", "sl", "il", true, FRT_METHOD(TransLogServer::relayToThreadRPC), this); + rb.DefineMethod("domainSync", "sl", "il", FRT_METHOD(TransLogServer::relayToThreadRPC), this); rb.MethodDesc("Sync domain to given entry"); rb.ParamDesc("name", "The name of the domain."); rb.ParamDesc("syncto", "Entry to sync to"); |