diff options
author | Geir Storli <geirst@verizonmedia.com> | 2019-04-01 11:23:16 +0000 |
---|---|---|
committer | Geir Storli <geirst@verizonmedia.com> | 2019-04-01 11:23:16 +0000 |
commit | 7fea70da5aca4da5e7cdafd0cc7f459f9d88e523 (patch) | |
tree | 381e44cc911d6b996654094c632700f25ed67933 | |
parent | b012478bd6aed5cf4d2f62854b6d800b8c020b0e (diff) |
Integrate rpc forwarder and use it when logd config says so.
-rw-r--r-- | logd/src/logd/config_subscriber.cpp | 33 | ||||
-rw-r--r-- | logd/src/logd/config_subscriber.h | 4 | ||||
-rw-r--r-- | logd/src/logd/rpc_forwarder.cpp | 59 | ||||
-rw-r--r-- | logd/src/logd/rpc_forwarder.h | 1 | ||||
-rw-r--r-- | logd/src/logd/watcher.cpp | 1 |
5 files changed, 71 insertions, 27 deletions
diff --git a/logd/src/logd/config_subscriber.cpp b/logd/src/logd/config_subscriber.cpp index d686052f6a1..8a7fa103fbc 100644 --- a/logd/src/logd/config_subscriber.cpp +++ b/logd/src/logd/config_subscriber.cpp @@ -3,6 +3,7 @@ #include "config_subscriber.h" #include "conn.h" #include "legacy_forwarder.h" +#include "rpc_forwarder.h" #include <fcntl.h> #include <unistd.h> @@ -46,6 +47,14 @@ ConfigSubscriber::configure(std::unique_ptr<LogdConfig> cfg) _logserver_port = newconf.logserver.port; _need_new_forwarder = true; } + if (newconf.logserver.rpcport != _logserver_rpc_port) { + _logserver_rpc_port = newconf.logserver.rpcport; + _need_new_forwarder = true; + } + if (newconf.logserver.userpc != _logserver_use_rpc) { + _logserver_use_rpc = newconf.logserver.userpc; + _need_new_forwarder = true; + } if (newconf.rotate.size > 0) { _rotate_size = newconf.rotate.size; } else { @@ -89,6 +98,8 @@ ConfigSubscriber::latch() ConfigSubscriber::ConfigSubscriber(const config::ConfigUri& configUri) : _logserver_host(), _logserver_port(0), + _logserver_rpc_port(0), + _logserver_use_rpc(false), _state_port(0), _forward_filter(), _rotate_size(INT_MAX), @@ -104,6 +115,7 @@ ConfigSubscriber::ConfigSubscriber(const config::ConfigUri& configUri) _handle = _subscriber.subscribe<LogdConfig>(configUri.getConfigId()); _subscriber.nextConfig(0); configure(_handle->getConfig()); + _supervisor.Start(); LOG(debug, "got logServer %s", _logserver_host.c_str()); LOG(debug, "got handle %p", _handle.get()); @@ -111,6 +123,7 @@ ConfigSubscriber::ConfigSubscriber(const config::ConfigUri& configUri) ConfigSubscriber::~ConfigSubscriber() { + _supervisor.ShutDown(true); LOG(debug, "forget logServer %s", _logserver_host.c_str()); LOG(debug, "done ~ConfSub()"); } @@ -118,12 +131,20 @@ ConfigSubscriber::~ConfigSubscriber() std::unique_ptr<Forwarder> ConfigSubscriber::make_forwarder(Metrics& metrics) { - LegacyForwarder::UP result = _use_logserver ? - LegacyForwarder::to_logserver(metrics, _logserver_host, _logserver_port) : - LegacyForwarder::to_dev_null(metrics); - result->setForwardMap(_forward_filter); - _need_new_forwarder = false; - return result; + if (_logserver_use_rpc) { + auto result = std::make_unique<RpcForwarder>(metrics, _supervisor, _logserver_host, + _logserver_rpc_port, 60.0, 100); + result->set_forward_filter(_forward_filter); + _need_new_forwarder = false; + return result; + } else { + auto result = _use_logserver ? + LegacyForwarder::to_logserver(metrics, _logserver_host, _logserver_port) : + LegacyForwarder::to_dev_null(metrics); + result->setForwardMap(_forward_filter); + _need_new_forwarder = false; + return result; + } } } diff --git a/logd/src/logd/config_subscriber.h b/logd/src/logd/config_subscriber.h index ab5d11478fb..db4630beb77 100644 --- a/logd/src/logd/config_subscriber.h +++ b/logd/src/logd/config_subscriber.h @@ -4,6 +4,7 @@ #include "forwarder.h" #include <logd/config-logd.h> #include <vespa/config/config.h> +#include <vespa/fnet/frt/supervisor.h> namespace logdemon { @@ -16,6 +17,8 @@ class ConfigSubscriber { private: std::string _logserver_host; int _logserver_port; + int _logserver_rpc_port; + bool _logserver_use_rpc; int _state_port; ForwardMap _forward_filter; int _rotate_size; @@ -27,6 +30,7 @@ private: config::ConfigHandle<cloud::config::log::LogdConfig>::UP _handle; bool _has_available; bool _need_new_forwarder; + FRT_Supervisor _supervisor; public: bool checkAvailable(); diff --git a/logd/src/logd/rpc_forwarder.cpp b/logd/src/logd/rpc_forwarder.cpp index e515f463db4..654266f18af 100644 --- a/logd/src/logd/rpc_forwarder.cpp +++ b/logd/src/logd/rpc_forwarder.cpp @@ -17,6 +17,39 @@ using vespalib::make_string; namespace logdemon { +namespace { + +class GuardedRequest { +private: + FRT_RPCRequest* _request; +public: + GuardedRequest() + : _request(new FRT_RPCRequest()) + {} + ~GuardedRequest() { + _request->SubRef(); + } + FRT_RPCRequest& operator*() const { return *_request; } + FRT_RPCRequest* get() const { return _request; } + FRT_RPCRequest* operator->() const { return get(); } +}; + +} + +void +RpcForwarder::ping_logserver() +{ + GuardedRequest request; + request->SetMethodName("frt.rpc.ping"); + _target->InvokeSync(request.get(), _rpc_timeout_secs); + if (!request->CheckReturnTypes("")) { + auto error_msg = make_string("Error in rpc ping to logserver ('%s'): '%s'", + _connection_spec.c_str(), request->GetErrorMessage()); + LOG(debug, "%s", error_msg.c_str()); + throw ConnectionException(error_msg); + } +} + RpcForwarder::RpcForwarder(Metrics& metrics, FRT_Supervisor& supervisor, const vespalib::string &hostname, int rpc_port, double rpc_timeout_secs, size_t max_messages_per_request) @@ -30,6 +63,7 @@ RpcForwarder::RpcForwarder(Metrics& metrics, FRT_Supervisor& supervisor, _forward_filter() { _target = supervisor.GetTarget(_connection_spec.c_str()); + ping_logserver(); } RpcForwarder::~RpcForwarder() @@ -93,25 +127,6 @@ RpcForwarder::forwardLine(std::string_view line) } } -namespace { - -class GuardedRequest { -private: - FRT_RPCRequest* _request; -public: - GuardedRequest() - : _request(new FRT_RPCRequest()) - {} - ~GuardedRequest() { - _request->SubRef(); - } - FRT_RPCRequest& operator*() const { return *_request; } - FRT_RPCRequest* get() const { return _request; } - FRT_RPCRequest* operator->() const { return get(); } -}; - -} - void RpcForwarder::flush() { @@ -124,13 +139,15 @@ RpcForwarder::flush() encode_log_request(proto_request, *request); _target->InvokeSync(request.get(), _rpc_timeout_secs); if (!request->CheckReturnTypes("bix")) { - auto error_msg = make_string("Error in rpc reply from '%s': '%s'", + auto error_msg = make_string("Error in rpc reply from logserver ('%s'): '%s'", _connection_spec.c_str(), request->GetErrorMessage()); + LOG(warning, "%s", error_msg.c_str()); throw ConnectionException(error_msg); } ProtoConverter::ProtoLogResponse proto_response; if (!decode_log_response(*request, proto_response)) { - auto error_msg = make_string("Error during decoding of protobuf response from '%s'", _connection_spec.c_str()); + auto error_msg = make_string("Error during decoding of protobuf response from logserver ('%s')", _connection_spec.c_str()); + LOG(warning, "%s", error_msg.c_str()); throw DecodeException(error_msg); } _messages.clear(); diff --git a/logd/src/logd/rpc_forwarder.h b/logd/src/logd/rpc_forwarder.h index e1e4bc95e06..0f224561830 100644 --- a/logd/src/logd/rpc_forwarder.h +++ b/logd/src/logd/rpc_forwarder.h @@ -26,6 +26,7 @@ private: int _bad_lines; ForwardMap _forward_filter; + void ping_logserver(); public: RpcForwarder(Metrics& metrics, FRT_Supervisor& supervisor, diff --git a/logd/src/logd/watcher.cpp b/logd/src/logd/watcher.cpp index d0633a22c99..a92ad456e9f 100644 --- a/logd/src/logd/watcher.cpp +++ b/logd/src/logd/watcher.cpp @@ -288,6 +288,7 @@ Watcher::watchfile() } } + _forwarder.flush(); dcf.saveState(already); if (_confsubscriber.checkAvailable()) { |