aboutsummaryrefslogtreecommitdiffstats
path: root/logd/src/logd/rpc_forwarder.cpp
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2019-04-01 11:23:16 +0000
committerGeir Storli <geirst@verizonmedia.com>2019-04-01 11:23:16 +0000
commit7fea70da5aca4da5e7cdafd0cc7f459f9d88e523 (patch)
tree381e44cc911d6b996654094c632700f25ed67933 /logd/src/logd/rpc_forwarder.cpp
parentb012478bd6aed5cf4d2f62854b6d800b8c020b0e (diff)
Integrate rpc forwarder and use it when logd config says so.
Diffstat (limited to 'logd/src/logd/rpc_forwarder.cpp')
-rw-r--r--logd/src/logd/rpc_forwarder.cpp59
1 files changed, 38 insertions, 21 deletions
diff --git a/logd/src/logd/rpc_forwarder.cpp b/logd/src/logd/rpc_forwarder.cpp
index e515f463db4..654266f18af 100644
--- a/logd/src/logd/rpc_forwarder.cpp
+++ b/logd/src/logd/rpc_forwarder.cpp
@@ -17,6 +17,39 @@ using vespalib::make_string;
namespace logdemon {
+namespace {
+
+class GuardedRequest {
+private:
+ FRT_RPCRequest* _request;
+public:
+ GuardedRequest()
+ : _request(new FRT_RPCRequest())
+ {}
+ ~GuardedRequest() {
+ _request->SubRef();
+ }
+ FRT_RPCRequest& operator*() const { return *_request; }
+ FRT_RPCRequest* get() const { return _request; }
+ FRT_RPCRequest* operator->() const { return get(); }
+};
+
+}
+
+void
+RpcForwarder::ping_logserver()
+{
+ GuardedRequest request;
+ request->SetMethodName("frt.rpc.ping");
+ _target->InvokeSync(request.get(), _rpc_timeout_secs);
+ if (!request->CheckReturnTypes("")) {
+ auto error_msg = make_string("Error in rpc ping to logserver ('%s'): '%s'",
+ _connection_spec.c_str(), request->GetErrorMessage());
+ LOG(debug, "%s", error_msg.c_str());
+ throw ConnectionException(error_msg);
+ }
+}
+
RpcForwarder::RpcForwarder(Metrics& metrics, FRT_Supervisor& supervisor,
const vespalib::string &hostname, int rpc_port,
double rpc_timeout_secs, size_t max_messages_per_request)
@@ -30,6 +63,7 @@ RpcForwarder::RpcForwarder(Metrics& metrics, FRT_Supervisor& supervisor,
_forward_filter()
{
_target = supervisor.GetTarget(_connection_spec.c_str());
+ ping_logserver();
}
RpcForwarder::~RpcForwarder()
@@ -93,25 +127,6 @@ RpcForwarder::forwardLine(std::string_view line)
}
}
-namespace {
-
-class GuardedRequest {
-private:
- FRT_RPCRequest* _request;
-public:
- GuardedRequest()
- : _request(new FRT_RPCRequest())
- {}
- ~GuardedRequest() {
- _request->SubRef();
- }
- FRT_RPCRequest& operator*() const { return *_request; }
- FRT_RPCRequest* get() const { return _request; }
- FRT_RPCRequest* operator->() const { return get(); }
-};
-
-}
-
void
RpcForwarder::flush()
{
@@ -124,13 +139,15 @@ RpcForwarder::flush()
encode_log_request(proto_request, *request);
_target->InvokeSync(request.get(), _rpc_timeout_secs);
if (!request->CheckReturnTypes("bix")) {
- auto error_msg = make_string("Error in rpc reply from '%s': '%s'",
+ auto error_msg = make_string("Error in rpc reply from logserver ('%s'): '%s'",
_connection_spec.c_str(), request->GetErrorMessage());
+ LOG(warning, "%s", error_msg.c_str());
throw ConnectionException(error_msg);
}
ProtoConverter::ProtoLogResponse proto_response;
if (!decode_log_response(*request, proto_response)) {
- auto error_msg = make_string("Error during decoding of protobuf response from '%s'", _connection_spec.c_str());
+ auto error_msg = make_string("Error during decoding of protobuf response from logserver ('%s')", _connection_spec.c_str());
+ LOG(warning, "%s", error_msg.c_str());
throw DecodeException(error_msg);
}
_messages.clear();