aboutsummaryrefslogtreecommitdiffstats
path: root/logd
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2019-04-01 11:23:16 +0000
committerGeir Storli <geirst@verizonmedia.com>2019-04-01 11:23:16 +0000
commit7fea70da5aca4da5e7cdafd0cc7f459f9d88e523 (patch)
tree381e44cc911d6b996654094c632700f25ed67933 /logd
parentb012478bd6aed5cf4d2f62854b6d800b8c020b0e (diff)
Integrate rpc forwarder and use it when logd config says so.
Diffstat (limited to 'logd')
-rw-r--r--logd/src/logd/config_subscriber.cpp33
-rw-r--r--logd/src/logd/config_subscriber.h4
-rw-r--r--logd/src/logd/rpc_forwarder.cpp59
-rw-r--r--logd/src/logd/rpc_forwarder.h1
-rw-r--r--logd/src/logd/watcher.cpp1
5 files changed, 71 insertions, 27 deletions
diff --git a/logd/src/logd/config_subscriber.cpp b/logd/src/logd/config_subscriber.cpp
index d686052f6a1..8a7fa103fbc 100644
--- a/logd/src/logd/config_subscriber.cpp
+++ b/logd/src/logd/config_subscriber.cpp
@@ -3,6 +3,7 @@
#include "config_subscriber.h"
#include "conn.h"
#include "legacy_forwarder.h"
+#include "rpc_forwarder.h"
#include <fcntl.h>
#include <unistd.h>
@@ -46,6 +47,14 @@ ConfigSubscriber::configure(std::unique_ptr<LogdConfig> cfg)
_logserver_port = newconf.logserver.port;
_need_new_forwarder = true;
}
+ if (newconf.logserver.rpcport != _logserver_rpc_port) {
+ _logserver_rpc_port = newconf.logserver.rpcport;
+ _need_new_forwarder = true;
+ }
+ if (newconf.logserver.userpc != _logserver_use_rpc) {
+ _logserver_use_rpc = newconf.logserver.userpc;
+ _need_new_forwarder = true;
+ }
if (newconf.rotate.size > 0) {
_rotate_size = newconf.rotate.size;
} else {
@@ -89,6 +98,8 @@ ConfigSubscriber::latch()
ConfigSubscriber::ConfigSubscriber(const config::ConfigUri& configUri)
: _logserver_host(),
_logserver_port(0),
+ _logserver_rpc_port(0),
+ _logserver_use_rpc(false),
_state_port(0),
_forward_filter(),
_rotate_size(INT_MAX),
@@ -104,6 +115,7 @@ ConfigSubscriber::ConfigSubscriber(const config::ConfigUri& configUri)
_handle = _subscriber.subscribe<LogdConfig>(configUri.getConfigId());
_subscriber.nextConfig(0);
configure(_handle->getConfig());
+ _supervisor.Start();
LOG(debug, "got logServer %s", _logserver_host.c_str());
LOG(debug, "got handle %p", _handle.get());
@@ -111,6 +123,7 @@ ConfigSubscriber::ConfigSubscriber(const config::ConfigUri& configUri)
ConfigSubscriber::~ConfigSubscriber()
{
+ _supervisor.ShutDown(true);
LOG(debug, "forget logServer %s", _logserver_host.c_str());
LOG(debug, "done ~ConfSub()");
}
@@ -118,12 +131,20 @@ ConfigSubscriber::~ConfigSubscriber()
std::unique_ptr<Forwarder>
ConfigSubscriber::make_forwarder(Metrics& metrics)
{
- LegacyForwarder::UP result = _use_logserver ?
- LegacyForwarder::to_logserver(metrics, _logserver_host, _logserver_port) :
- LegacyForwarder::to_dev_null(metrics);
- result->setForwardMap(_forward_filter);
- _need_new_forwarder = false;
- return result;
+ if (_logserver_use_rpc) {
+ auto result = std::make_unique<RpcForwarder>(metrics, _supervisor, _logserver_host,
+ _logserver_rpc_port, 60.0, 100);
+ result->set_forward_filter(_forward_filter);
+ _need_new_forwarder = false;
+ return result;
+ } else {
+ auto result = _use_logserver ?
+ LegacyForwarder::to_logserver(metrics, _logserver_host, _logserver_port) :
+ LegacyForwarder::to_dev_null(metrics);
+ result->setForwardMap(_forward_filter);
+ _need_new_forwarder = false;
+ return result;
+ }
}
}
diff --git a/logd/src/logd/config_subscriber.h b/logd/src/logd/config_subscriber.h
index ab5d11478fb..db4630beb77 100644
--- a/logd/src/logd/config_subscriber.h
+++ b/logd/src/logd/config_subscriber.h
@@ -4,6 +4,7 @@
#include "forwarder.h"
#include <logd/config-logd.h>
#include <vespa/config/config.h>
+#include <vespa/fnet/frt/supervisor.h>
namespace logdemon {
@@ -16,6 +17,8 @@ class ConfigSubscriber {
private:
std::string _logserver_host;
int _logserver_port;
+ int _logserver_rpc_port;
+ bool _logserver_use_rpc;
int _state_port;
ForwardMap _forward_filter;
int _rotate_size;
@@ -27,6 +30,7 @@ private:
config::ConfigHandle<cloud::config::log::LogdConfig>::UP _handle;
bool _has_available;
bool _need_new_forwarder;
+ FRT_Supervisor _supervisor;
public:
bool checkAvailable();
diff --git a/logd/src/logd/rpc_forwarder.cpp b/logd/src/logd/rpc_forwarder.cpp
index e515f463db4..654266f18af 100644
--- a/logd/src/logd/rpc_forwarder.cpp
+++ b/logd/src/logd/rpc_forwarder.cpp
@@ -17,6 +17,39 @@ using vespalib::make_string;
namespace logdemon {
+namespace {
+
+class GuardedRequest {
+private:
+ FRT_RPCRequest* _request;
+public:
+ GuardedRequest()
+ : _request(new FRT_RPCRequest())
+ {}
+ ~GuardedRequest() {
+ _request->SubRef();
+ }
+ FRT_RPCRequest& operator*() const { return *_request; }
+ FRT_RPCRequest* get() const { return _request; }
+ FRT_RPCRequest* operator->() const { return get(); }
+};
+
+}
+
+void
+RpcForwarder::ping_logserver()
+{
+ GuardedRequest request;
+ request->SetMethodName("frt.rpc.ping");
+ _target->InvokeSync(request.get(), _rpc_timeout_secs);
+ if (!request->CheckReturnTypes("")) {
+ auto error_msg = make_string("Error in rpc ping to logserver ('%s'): '%s'",
+ _connection_spec.c_str(), request->GetErrorMessage());
+ LOG(debug, "%s", error_msg.c_str());
+ throw ConnectionException(error_msg);
+ }
+}
+
RpcForwarder::RpcForwarder(Metrics& metrics, FRT_Supervisor& supervisor,
const vespalib::string &hostname, int rpc_port,
double rpc_timeout_secs, size_t max_messages_per_request)
@@ -30,6 +63,7 @@ RpcForwarder::RpcForwarder(Metrics& metrics, FRT_Supervisor& supervisor,
_forward_filter()
{
_target = supervisor.GetTarget(_connection_spec.c_str());
+ ping_logserver();
}
RpcForwarder::~RpcForwarder()
@@ -93,25 +127,6 @@ RpcForwarder::forwardLine(std::string_view line)
}
}
-namespace {
-
-class GuardedRequest {
-private:
- FRT_RPCRequest* _request;
-public:
- GuardedRequest()
- : _request(new FRT_RPCRequest())
- {}
- ~GuardedRequest() {
- _request->SubRef();
- }
- FRT_RPCRequest& operator*() const { return *_request; }
- FRT_RPCRequest* get() const { return _request; }
- FRT_RPCRequest* operator->() const { return get(); }
-};
-
-}
-
void
RpcForwarder::flush()
{
@@ -124,13 +139,15 @@ RpcForwarder::flush()
encode_log_request(proto_request, *request);
_target->InvokeSync(request.get(), _rpc_timeout_secs);
if (!request->CheckReturnTypes("bix")) {
- auto error_msg = make_string("Error in rpc reply from '%s': '%s'",
+ auto error_msg = make_string("Error in rpc reply from logserver ('%s'): '%s'",
_connection_spec.c_str(), request->GetErrorMessage());
+ LOG(warning, "%s", error_msg.c_str());
throw ConnectionException(error_msg);
}
ProtoConverter::ProtoLogResponse proto_response;
if (!decode_log_response(*request, proto_response)) {
- auto error_msg = make_string("Error during decoding of protobuf response from '%s'", _connection_spec.c_str());
+ auto error_msg = make_string("Error during decoding of protobuf response from logserver ('%s')", _connection_spec.c_str());
+ LOG(warning, "%s", error_msg.c_str());
throw DecodeException(error_msg);
}
_messages.clear();
diff --git a/logd/src/logd/rpc_forwarder.h b/logd/src/logd/rpc_forwarder.h
index e1e4bc95e06..0f224561830 100644
--- a/logd/src/logd/rpc_forwarder.h
+++ b/logd/src/logd/rpc_forwarder.h
@@ -26,6 +26,7 @@ private:
int _bad_lines;
ForwardMap _forward_filter;
+ void ping_logserver();
public:
RpcForwarder(Metrics& metrics, FRT_Supervisor& supervisor,
diff --git a/logd/src/logd/watcher.cpp b/logd/src/logd/watcher.cpp
index d0633a22c99..a92ad456e9f 100644
--- a/logd/src/logd/watcher.cpp
+++ b/logd/src/logd/watcher.cpp
@@ -288,6 +288,7 @@ Watcher::watchfile()
}
}
+ _forwarder.flush();
dcf.saveState(already);
if (_confsubscriber.checkAvailable()) {