summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2019-03-29 15:49:54 +0100
committerGitHub <noreply@github.com>2019-03-29 15:49:54 +0100
commit7ef26aab4837ec6ab76370a17b7fa236591cbc5d (patch)
tree9b92a9e23d7a0a42f3da34b560b42a45cf1596cd
parent9e592933ccfdf4833e6ff2413e05074e033b68ca (diff)
parentb794e044098a4ba88335bc7e0686603ed50a41f0 (diff)
Merge pull request #8961 from vespa-engine/geirst/logd-prepare-for-rpc-forwarder-integration
Prepare ConfigSubscriber to support instantiation of either legacy or…
-rw-r--r--logd/src/apps/logd/main.cpp24
-rw-r--r--logd/src/logd/config_subscriber.cpp107
-rw-r--r--logd/src/logd/config_subscriber.h29
-rw-r--r--logd/src/logd/forwarder.h2
-rw-r--r--logd/src/logd/legacy_forwarder.cpp74
-rw-r--r--logd/src/logd/legacy_forwarder.h21
-rw-r--r--logd/src/logd/rpc_forwarder.h1
-rw-r--r--logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp28
8 files changed, 161 insertions, 125 deletions
diff --git a/logd/src/apps/logd/main.cpp b/logd/src/apps/logd/main.cpp
index 229f385d416..067894e96d9 100644
--- a/logd/src/apps/logd/main.cpp
+++ b/logd/src/apps/logd/main.cpp
@@ -21,7 +21,6 @@ int main(int, char**)
{
StateReporter stateReporter;
Metrics metrics(stateReporter.metrics());
- LegacyForwarder fwd(metrics);
EV_STARTED("logdemon");
@@ -30,26 +29,27 @@ int main(int, char**)
const char *cfid = getenv("VESPA_CONFIG_ID");
try {
- ConfigSubscriber subscriber(fwd, config::ConfigUri(cfid));
+ config::ConfigUri config_uri(cfid);
+ ConfigSubscriber subscriber(config_uri);
+ Forwarder::UP forwarder;
int sleepcount = 0;
while (true) {
- Watcher watcher(subscriber, fwd);
-
try {
subscriber.latch();
+ if (!forwarder || subscriber.need_new_forwarder()) {
+ forwarder = subscriber.make_forwarder(metrics);
+ }
+ Watcher watcher(subscriber, *forwarder);
+
stateReporter.setStatePort(subscriber.getStatePort());
stateReporter.gotConf(subscriber.generation());
- int fd = subscriber.getservfd();
- if (fd >= 0) {
- sleepcount = 0 ; // connection OK, reset sleep time
- watcher.watchfile();
- } else {
- LOG(spam, "bad fd in subscriber");
- }
+
+ sleepcount = 0 ; // connection OK, reset sleep time
+ watcher.watchfile();
} catch (ConnectionException& ex) {
LOG(debug, "connection exception: %s", ex.what());
- subscriber.closeConn();
+ forwarder.reset();
}
if (catcher.receivedStopSignal()) {
throw SigTermException("caught signal");
diff --git a/logd/src/logd/config_subscriber.cpp b/logd/src/logd/config_subscriber.cpp
index 33d662a6ca2..d686052f6a1 100644
--- a/logd/src/logd/config_subscriber.cpp
+++ b/logd/src/logd/config_subscriber.cpp
@@ -18,15 +18,15 @@ void
ConfigSubscriber::configure(std::unique_ptr<LogdConfig> cfg)
{
const LogdConfig &newconf(*cfg);
- if (newconf.logserver.host != _logServer) {
- _logServer = newconf.logserver.host;
- _needToConnect = true;
+ if (newconf.logserver.host != _logserver_host) {
+ _logserver_host = newconf.logserver.host;
+ _need_new_forwarder = true;
}
if (newconf.logserver.use != _use_logserver) {
_use_logserver = newconf.logserver.use;
- _needToConnect = true;
+ _need_new_forwarder = true;
}
- _statePort = newconf.stateport;
+ _state_port = newconf.stateport;
ForwardMap forwardMap;
forwardMap[Logger::fatal] = newconf.loglevel.fatal.forward;
@@ -37,11 +37,14 @@ ConfigSubscriber::configure(std::unique_ptr<LogdConfig> cfg)
forwardMap[Logger::event] = newconf.loglevel.event.forward;
forwardMap[Logger::debug] = newconf.loglevel.debug.forward;
forwardMap[Logger::spam] = newconf.loglevel.spam.forward;
- _fw.setForwardMap(forwardMap);
+ if (forwardMap != _forward_filter) {
+ _forward_filter = forwardMap;
+ _need_new_forwarder = true;
+ }
- if (newconf.logserver.port != _logPort) {
- _logPort = newconf.logserver.port;
- _needToConnect = true;
+ if (newconf.logserver.port != _logserver_port) {
+ _logserver_port = newconf.logserver.port;
+ _need_new_forwarder = true;
}
if (newconf.rotate.size > 0) {
_rotate_size = newconf.rotate.size;
@@ -69,9 +72,9 @@ bool
ConfigSubscriber::checkAvailable()
{
if (_subscriber.nextGeneration(0)) {
- _hasAvailable = true;
+ _has_available = true;
}
- return _hasAvailable;
+ return _has_available;
}
void
@@ -79,88 +82,48 @@ ConfigSubscriber::latch()
{
if (checkAvailable()) {
configure(_handle->getConfig());
- _hasAvailable = false;
- }
- if (_needToConnect) {
- if (_use_logserver) {
- connectToLogserver();
- } else {
- connectToDevNull();
- }
- }
-}
-
-void
-ConfigSubscriber::connectToLogserver()
-{
- int newfd = makeconn(_logServer.c_str(), _logPort);
- if (newfd >= 0) {
- resetFileDescriptor(newfd);
- LOG(debug, "connected to logserver at %s:%d", _logServer.c_str(), _logPort);
- } else {
- LOG(debug, "could not connect to %s:%d", _logServer.c_str(), _logPort);
- }
-}
-
-void
-ConfigSubscriber::connectToDevNull()
-{
- int newfd = open("/dev/null", O_RDWR);
- if (newfd >= 0) {
- resetFileDescriptor(newfd);
- LOG(debug, "opened /dev/null for read/write");
- } else {
- LOG(debug, "error opening /dev/null (%d): %s", newfd, strerror(newfd));
+ _has_available = false;
}
}
-void
-ConfigSubscriber::resetFileDescriptor(int newfd)
-{
- if (_logserverfd >= 0) {
- close(_logserverfd);
- }
- _logserverfd = newfd;
- _fw.setLogserverFD(newfd);
- _needToConnect = false;
-}
-
-void
-ConfigSubscriber::closeConn()
-{
- close(_logserverfd);
- _logserverfd = -1;
- _needToConnect = true;
-}
-
-ConfigSubscriber::ConfigSubscriber(LegacyForwarder &fw, const config::ConfigUri &configUri)
- : _logServer(),
- _logPort(0),
- _logserverfd(-1),
- _statePort(0),
+ConfigSubscriber::ConfigSubscriber(const config::ConfigUri& configUri)
+ : _logserver_host(),
+ _logserver_port(0),
+ _state_port(0),
+ _forward_filter(),
_rotate_size(INT_MAX),
_rotate_age(INT_MAX),
_remove_meg(INT_MAX),
_remove_age(3650),
_use_logserver(true),
- _fw(fw),
_subscriber(configUri.getContext()),
_handle(),
- _hasAvailable(false),
- _needToConnect(true)
+ _has_available(false),
+ _need_new_forwarder(true)
{
_handle = _subscriber.subscribe<LogdConfig>(configUri.getConfigId());
_subscriber.nextConfig(0);
configure(_handle->getConfig());
- LOG(debug, "got logServer %s", _logServer.c_str());
+ LOG(debug, "got logServer %s", _logserver_host.c_str());
LOG(debug, "got handle %p", _handle.get());
}
ConfigSubscriber::~ConfigSubscriber()
{
- LOG(debug, "forget logServer %s", _logServer.c_str());
+ LOG(debug, "forget logServer %s", _logserver_host.c_str());
LOG(debug, "done ~ConfSub()");
}
+std::unique_ptr<Forwarder>
+ConfigSubscriber::make_forwarder(Metrics& metrics)
+{
+ LegacyForwarder::UP result = _use_logserver ?
+ LegacyForwarder::to_logserver(metrics, _logserver_host, _logserver_port) :
+ LegacyForwarder::to_dev_null(metrics);
+ result->setForwardMap(_forward_filter);
+ _need_new_forwarder = false;
+ return result;
+}
+
}
diff --git a/logd/src/logd/config_subscriber.h b/logd/src/logd/config_subscriber.h
index 1650e46a17a..ab5d11478fb 100644
--- a/logd/src/logd/config_subscriber.h
+++ b/logd/src/logd/config_subscriber.h
@@ -1,52 +1,49 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "forwarder.h"
#include <logd/config-logd.h>
#include <vespa/config/config.h>
namespace logdemon {
-class LegacyForwarder;
+class Metrics;
/**
* Class used to subscribe for logd config.
*/
class ConfigSubscriber {
private:
- std::string _logServer;
- int _logPort;
- int _logserverfd;
- int _statePort;
+ std::string _logserver_host;
+ int _logserver_port;
+ int _state_port;
+ ForwardMap _forward_filter;
int _rotate_size;
int _rotate_age;
int _remove_meg;
int _remove_age;
bool _use_logserver;
- LegacyForwarder& _fw;
config::ConfigSubscriber _subscriber;
config::ConfigHandle<cloud::config::log::LogdConfig>::UP _handle;
- bool _hasAvailable;
- bool _needToConnect;
+ bool _has_available;
+ bool _need_new_forwarder;
- void connectToLogserver();
- void connectToDevNull();
- void resetFileDescriptor(int newfd);
public:
bool checkAvailable();
void latch();
- void closeConn();
ConfigSubscriber(const ConfigSubscriber& other) = delete;
ConfigSubscriber& operator=(const ConfigSubscriber& other) = delete;
- ConfigSubscriber(LegacyForwarder &fw, const config::ConfigUri &configUri);
+ ConfigSubscriber(const config::ConfigUri& configUri);
~ConfigSubscriber();
- int getStatePort() const { return _statePort; }
- int getservfd() const { return _logserverfd; }
+ int getStatePort() const { return _state_port; }
int getRotateSize() const { return _rotate_size; }
int getRotateAge() const { return _rotate_age; }
int getRemoveMegabytes() const { return _remove_meg; }
int getRemoveAge() const { return _remove_age; }
- bool useLogserver() const { return _use_logserver; }
+
+ bool need_new_forwarder() const { return _need_new_forwarder; }
+ std::unique_ptr<Forwarder> make_forwarder(Metrics& metrics);
void configure(std::unique_ptr<cloud::config::log::LogdConfig> cfg);
size_t generation() const { return _subscriber.getGeneration(); }
diff --git a/logd/src/logd/forwarder.h b/logd/src/logd/forwarder.h
index 93cfdb3de9f..c43a3263bf0 100644
--- a/logd/src/logd/forwarder.h
+++ b/logd/src/logd/forwarder.h
@@ -4,6 +4,7 @@
#include <vespa/log/log.h>
#include <map>
+#include <memory>
#include <string_view>
namespace logdemon {
@@ -16,6 +17,7 @@ using ForwardMap = std::map<ns_log::Logger::LogLevel, bool>;
*/
class Forwarder {
public:
+ using UP = std::unique_ptr<Forwarder>;
virtual ~Forwarder() {}
virtual void sendMode() = 0;
virtual void forwardLine(std::string_view log_line) = 0;
diff --git a/logd/src/logd/legacy_forwarder.cpp b/logd/src/logd/legacy_forwarder.cpp
index 6b2f430d388..32be2bd72c8 100644
--- a/logd/src/logd/legacy_forwarder.cpp
+++ b/logd/src/logd/legacy_forwarder.cpp
@@ -1,5 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "conn.h"
#include "exceptions.h"
#include "legacy_forwarder.h"
#include "metrics.h"
@@ -7,6 +8,8 @@
#include <vespa/log/exceptions.h>
#include <vespa/vespalib/component/vtag.h>
#include <vespa/vespalib/locale/c.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <fcntl.h>
#include <unistd.h>
#include <vespa/log/log.h>
@@ -17,21 +20,82 @@ using ns_log::BadLogLineException;
using ns_log::LogMessage;
using ns_log::Logger;
using LogLevel = Logger::LogLevel;
+using vespalib::make_string;
namespace logdemon {
+void
+LegacyForwarder::connect_to_logserver(const vespalib::string& logserver_host, int logserver_port)
+{
+ int new_fd = makeconn(logserver_host.c_str(), logserver_port);
+ if (new_fd >= 0) {
+ LOG(debug, "Connected to logserver at %s:%d", logserver_host.c_str(), logserver_port);
+ _logserver_fd = new_fd;
+ } else {
+ auto error_msg = make_string("Could not connect to %s:%d", logserver_host.c_str(), logserver_port);
+ LOG(debug, "%s", error_msg.c_str());
+ throw ConnectionException(error_msg);
+ }
+}
+
+void
+LegacyForwarder::connect_to_dev_null()
+{
+ int new_fd = open("/dev/null", O_RDWR);
+ if (new_fd >= 0) {
+ LOG(debug, "Opened /dev/null for read/write");
+ _logserver_fd = new_fd;
+ } else {
+ auto error_msg = make_string("Error opening /dev/null (%d): %s", new_fd, strerror(new_fd));
+ LOG(debug, "%s", error_msg.c_str());
+ throw ConnectionException(error_msg);
+ }
+}
+
LegacyForwarder::LegacyForwarder(Metrics &metrics)
- : _logserverfd(-1),
+ :
_metrics(metrics),
+ _logserver_fd(-1),
_forwardMap(),
_badLines(0)
-{}
-LegacyForwarder::~LegacyForwarder() = default;
+{
+}
+
+LegacyForwarder::UP
+LegacyForwarder::to_logserver(Metrics& metrics, const vespalib::string& logserver_host, int logserver_port)
+{
+ LegacyForwarder::UP result(new LegacyForwarder(metrics));
+ result->connect_to_logserver(logserver_host, logserver_port);
+ return result;
+}
+
+LegacyForwarder::UP
+LegacyForwarder::to_dev_null(Metrics& metrics)
+{
+ LegacyForwarder::UP result(new LegacyForwarder(metrics));
+ result->connect_to_dev_null();
+ return result;
+}
+
+LegacyForwarder::UP
+LegacyForwarder::to_open_file(Metrics& metrics, int file_desc)
+{
+ LegacyForwarder::UP result(new LegacyForwarder(metrics));
+ result->_logserver_fd = file_desc;
+ return result;
+}
+
+LegacyForwarder::~LegacyForwarder()
+{
+ if (_logserver_fd >= 0) {
+ close(_logserver_fd);
+ }
+}
void
LegacyForwarder::forwardText(const char *text, int len)
{
- int wsize = write(_logserverfd, text, len);
+ int wsize = write(_logserver_fd, text, len);
if (wsize != len) {
if (wsize > 0) {
@@ -60,7 +124,7 @@ LegacyForwarder::sendMode()
void
LegacyForwarder::forwardLine(std::string_view line)
{
- assert(_logserverfd >= 0);
+ assert(_logserver_fd >= 0);
assert (line.size() > 0);
assert (line.size() < 1024*1024);
assert (line[line.size() - 1] == '\n');
diff --git a/logd/src/logd/legacy_forwarder.h b/logd/src/logd/legacy_forwarder.h
index db3bf84fd4f..bbd7ed840ae 100644
--- a/logd/src/logd/legacy_forwarder.h
+++ b/logd/src/logd/legacy_forwarder.h
@@ -2,6 +2,8 @@
#pragma once
#include "forwarder.h"
+#include <vespa/vespalib/stllike/string.h>
+#include <memory>
namespace logdemon {
@@ -12,8 +14,8 @@ struct Metrics;
*/
class LegacyForwarder : public Forwarder {
private:
- int _logserverfd;
Metrics &_metrics;
+ int _logserver_fd;
ForwardMap _forwardMap;
int _badLines;
const char *copystr(const char *b, const char *e) {
@@ -23,16 +25,23 @@ private:
ret[len] = '\0';
return ret;
}
+ void connect_to_logserver(const vespalib::string& logserver_host, int logserver_port);
+ void connect_to_dev_null();
bool parseLine(std::string_view line);
-public:
+ void forwardText(const char *text, int len);
LegacyForwarder(Metrics &metrics);
+
+public:
+ using UP = std::unique_ptr<LegacyForwarder>;
+ static LegacyForwarder::UP to_logserver(Metrics& metrics, const vespalib::string& logserver_host, int logserver_port);
+ static LegacyForwarder::UP to_dev_null(Metrics& metrics);
+ static LegacyForwarder::UP to_open_file(Metrics& metrics, int file_desc);
~LegacyForwarder();
- void forwardText(const char *text, int len);
+ void setForwardMap(const ForwardMap& forwardMap) { _forwardMap = forwardMap; }
+
+ // Implements Forwarder
void forwardLine(std::string_view line) override;
void flush() override {}
- void setForwardMap(const ForwardMap & forwardMap) { _forwardMap = forwardMap; }
- void setLogserverFD(int fd) { _logserverfd = fd; }
- int getLogserverFD() { return _logserverfd; }
void sendMode() override;
int badLines() const override { return _badLines; }
void resetBadLines() override { _badLines = 0; }
diff --git a/logd/src/logd/rpc_forwarder.h b/logd/src/logd/rpc_forwarder.h
index 3212da08195..e1e4bc95e06 100644
--- a/logd/src/logd/rpc_forwarder.h
+++ b/logd/src/logd/rpc_forwarder.h
@@ -26,6 +26,7 @@ private:
int _bad_lines;
ForwardMap _forward_filter;
+
public:
RpcForwarder(Metrics& metrics, FRT_Supervisor& supervisor,
const vespalib::string& logserver_host, int logserver_rpc_port,
diff --git a/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp b/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp
index c6702e8bc67..83a54a24365 100644
--- a/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp
+++ b/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp
@@ -12,22 +12,24 @@
using ns_log::Logger;
using namespace logdemon;
+std::shared_ptr<vespalib::metrics::MetricsManager> dummy = vespalib::metrics::DummyMetricsManager::create();
+Metrics m(dummy);
+
struct ForwardFixture {
- LegacyForwarder &forwarder;
+ LegacyForwarder::UP forwarder;
int fd;
const std::string fname;
const std::string logLine;
- ForwardFixture(LegacyForwarder &fw, const std::string &fileName)
- : forwarder(fw),
+ ForwardFixture(const std::string& fileName)
+ : forwarder(),
fd(-1),
fname(fileName),
logLine(createLogLine())
{
fd = open(fileName.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0777);
- forwarder.setLogserverFD(fd);
+ forwarder = LegacyForwarder::to_open_file(m, fd);
}
~ForwardFixture() {
- close(fd);
}
const std::string createLogLine() {
@@ -40,7 +42,7 @@ struct ForwardFixture {
}
void verifyForward(bool doForward) {
- forwarder.forwardLine(logLine);
+ forwarder->forwardLine(logLine);
fsync(fd);
int rfd = open(fname.c_str(), O_RDONLY);
char *buffer[2048];
@@ -51,21 +53,19 @@ struct ForwardFixture {
}
};
-std::shared_ptr<vespalib::metrics::MetricsManager> dummy = vespalib::metrics::DummyMetricsManager::create();
-Metrics m(dummy);
-TEST_FF("require that forwarder forwards if set", LegacyForwarder(m), ForwardFixture(f1, "forward.txt")) {
+TEST_F("require that forwarder forwards if set", ForwardFixture("forward.txt")) {
ForwardMap forwardMap;
forwardMap[Logger::event] = true;
- f1.setForwardMap(forwardMap);
- f2.verifyForward(true);
+ f1.forwarder->setForwardMap(forwardMap);
+ f1.verifyForward(true);
}
-TEST_FF("require that forwarder does not forward if not set", LegacyForwarder(m), ForwardFixture(f1, "forward.txt")) {
+TEST_F("require that forwarder does not forward if not set", ForwardFixture("forward.txt")) {
ForwardMap forwardMap;
forwardMap[Logger::event] = false;
- f1.setForwardMap(forwardMap);
- f2.verifyForward(false);
+ f1.forwarder->setForwardMap(forwardMap);
+ f1.verifyForward(false);
}
TEST_MAIN() { TEST_RUN_ALL(); }