diff options
author | Geir Storli <geirst@verizonmedia.com> | 2019-04-01 11:23:16 +0000 |
---|---|---|
committer | Geir Storli <geirst@verizonmedia.com> | 2019-04-01 11:23:16 +0000 |
commit | 7fea70da5aca4da5e7cdafd0cc7f459f9d88e523 (patch) | |
tree | 381e44cc911d6b996654094c632700f25ed67933 /logd/src/logd/rpc_forwarder.cpp | |
parent | b012478bd6aed5cf4d2f62854b6d800b8c020b0e (diff) |
Integrate rpc forwarder and use it when logd config says so.
Diffstat (limited to 'logd/src/logd/rpc_forwarder.cpp')
-rw-r--r-- | logd/src/logd/rpc_forwarder.cpp | 59 |
1 files changed, 38 insertions, 21 deletions
diff --git a/logd/src/logd/rpc_forwarder.cpp b/logd/src/logd/rpc_forwarder.cpp index e515f463db4..654266f18af 100644 --- a/logd/src/logd/rpc_forwarder.cpp +++ b/logd/src/logd/rpc_forwarder.cpp @@ -17,6 +17,39 @@ using vespalib::make_string; namespace logdemon { +namespace { + +class GuardedRequest { +private: + FRT_RPCRequest* _request; +public: + GuardedRequest() + : _request(new FRT_RPCRequest()) + {} + ~GuardedRequest() { + _request->SubRef(); + } + FRT_RPCRequest& operator*() const { return *_request; } + FRT_RPCRequest* get() const { return _request; } + FRT_RPCRequest* operator->() const { return get(); } +}; + +} + +void +RpcForwarder::ping_logserver() +{ + GuardedRequest request; + request->SetMethodName("frt.rpc.ping"); + _target->InvokeSync(request.get(), _rpc_timeout_secs); + if (!request->CheckReturnTypes("")) { + auto error_msg = make_string("Error in rpc ping to logserver ('%s'): '%s'", + _connection_spec.c_str(), request->GetErrorMessage()); + LOG(debug, "%s", error_msg.c_str()); + throw ConnectionException(error_msg); + } +} + RpcForwarder::RpcForwarder(Metrics& metrics, FRT_Supervisor& supervisor, const vespalib::string &hostname, int rpc_port, double rpc_timeout_secs, size_t max_messages_per_request) @@ -30,6 +63,7 @@ RpcForwarder::RpcForwarder(Metrics& metrics, FRT_Supervisor& supervisor, _forward_filter() { _target = supervisor.GetTarget(_connection_spec.c_str()); + ping_logserver(); } RpcForwarder::~RpcForwarder() @@ -93,25 +127,6 @@ RpcForwarder::forwardLine(std::string_view line) } } -namespace { - -class GuardedRequest { -private: - FRT_RPCRequest* _request; -public: - GuardedRequest() - : _request(new FRT_RPCRequest()) - {} - ~GuardedRequest() { - _request->SubRef(); - } - FRT_RPCRequest& operator*() const { return *_request; } - FRT_RPCRequest* get() const { return _request; } - FRT_RPCRequest* operator->() const { return get(); } -}; - -} - void RpcForwarder::flush() { @@ -124,13 +139,15 @@ RpcForwarder::flush() encode_log_request(proto_request, *request); _target->InvokeSync(request.get(), _rpc_timeout_secs); if (!request->CheckReturnTypes("bix")) { - auto error_msg = make_string("Error in rpc reply from '%s': '%s'", + auto error_msg = make_string("Error in rpc reply from logserver ('%s'): '%s'", _connection_spec.c_str(), request->GetErrorMessage()); + LOG(warning, "%s", error_msg.c_str()); throw ConnectionException(error_msg); } ProtoConverter::ProtoLogResponse proto_response; if (!decode_log_response(*request, proto_response)) { - auto error_msg = make_string("Error during decoding of protobuf response from '%s'", _connection_spec.c_str()); + auto error_msg = make_string("Error during decoding of protobuf response from logserver ('%s')", _connection_spec.c_str()); + LOG(warning, "%s", error_msg.c_str()); throw DecodeException(error_msg); } _messages.clear(); |