diff options
author | Geir Storli <geirst@verizonmedia.com> | 2019-03-29 15:49:54 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-29 15:49:54 +0100 |
commit | 7ef26aab4837ec6ab76370a17b7fa236591cbc5d (patch) | |
tree | 9b92a9e23d7a0a42f3da34b560b42a45cf1596cd /logd/src | |
parent | 9e592933ccfdf4833e6ff2413e05074e033b68ca (diff) | |
parent | b794e044098a4ba88335bc7e0686603ed50a41f0 (diff) |
Merge pull request #8961 from vespa-engine/geirst/logd-prepare-for-rpc-forwarder-integration
Prepare ConfigSubscriber to support instantiation of either legacy or…
Diffstat (limited to 'logd/src')
-rw-r--r-- | logd/src/apps/logd/main.cpp | 24 | ||||
-rw-r--r-- | logd/src/logd/config_subscriber.cpp | 107 | ||||
-rw-r--r-- | logd/src/logd/config_subscriber.h | 29 | ||||
-rw-r--r-- | logd/src/logd/forwarder.h | 2 | ||||
-rw-r--r-- | logd/src/logd/legacy_forwarder.cpp | 74 | ||||
-rw-r--r-- | logd/src/logd/legacy_forwarder.h | 21 | ||||
-rw-r--r-- | logd/src/logd/rpc_forwarder.h | 1 | ||||
-rw-r--r-- | logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp | 28 |
8 files changed, 161 insertions, 125 deletions
diff --git a/logd/src/apps/logd/main.cpp b/logd/src/apps/logd/main.cpp index 229f385d416..067894e96d9 100644 --- a/logd/src/apps/logd/main.cpp +++ b/logd/src/apps/logd/main.cpp @@ -21,7 +21,6 @@ int main(int, char**) { StateReporter stateReporter; Metrics metrics(stateReporter.metrics()); - LegacyForwarder fwd(metrics); EV_STARTED("logdemon"); @@ -30,26 +29,27 @@ int main(int, char**) const char *cfid = getenv("VESPA_CONFIG_ID"); try { - ConfigSubscriber subscriber(fwd, config::ConfigUri(cfid)); + config::ConfigUri config_uri(cfid); + ConfigSubscriber subscriber(config_uri); + Forwarder::UP forwarder; int sleepcount = 0; while (true) { - Watcher watcher(subscriber, fwd); - try { subscriber.latch(); + if (!forwarder || subscriber.need_new_forwarder()) { + forwarder = subscriber.make_forwarder(metrics); + } + Watcher watcher(subscriber, *forwarder); + stateReporter.setStatePort(subscriber.getStatePort()); stateReporter.gotConf(subscriber.generation()); - int fd = subscriber.getservfd(); - if (fd >= 0) { - sleepcount = 0 ; // connection OK, reset sleep time - watcher.watchfile(); - } else { - LOG(spam, "bad fd in subscriber"); - } + + sleepcount = 0 ; // connection OK, reset sleep time + watcher.watchfile(); } catch (ConnectionException& ex) { LOG(debug, "connection exception: %s", ex.what()); - subscriber.closeConn(); + forwarder.reset(); } if (catcher.receivedStopSignal()) { throw SigTermException("caught signal"); diff --git a/logd/src/logd/config_subscriber.cpp b/logd/src/logd/config_subscriber.cpp index 33d662a6ca2..d686052f6a1 100644 --- a/logd/src/logd/config_subscriber.cpp +++ b/logd/src/logd/config_subscriber.cpp @@ -18,15 +18,15 @@ void ConfigSubscriber::configure(std::unique_ptr<LogdConfig> cfg) { const LogdConfig &newconf(*cfg); - if (newconf.logserver.host != _logServer) { - _logServer = newconf.logserver.host; - _needToConnect = true; + if (newconf.logserver.host != _logserver_host) { + _logserver_host = newconf.logserver.host; + _need_new_forwarder = true; } if (newconf.logserver.use != _use_logserver) { _use_logserver = newconf.logserver.use; - _needToConnect = true; + _need_new_forwarder = true; } - _statePort = newconf.stateport; + _state_port = newconf.stateport; ForwardMap forwardMap; forwardMap[Logger::fatal] = newconf.loglevel.fatal.forward; @@ -37,11 +37,14 @@ ConfigSubscriber::configure(std::unique_ptr<LogdConfig> cfg) forwardMap[Logger::event] = newconf.loglevel.event.forward; forwardMap[Logger::debug] = newconf.loglevel.debug.forward; forwardMap[Logger::spam] = newconf.loglevel.spam.forward; - _fw.setForwardMap(forwardMap); + if (forwardMap != _forward_filter) { + _forward_filter = forwardMap; + _need_new_forwarder = true; + } - if (newconf.logserver.port != _logPort) { - _logPort = newconf.logserver.port; - _needToConnect = true; + if (newconf.logserver.port != _logserver_port) { + _logserver_port = newconf.logserver.port; + _need_new_forwarder = true; } if (newconf.rotate.size > 0) { _rotate_size = newconf.rotate.size; @@ -69,9 +72,9 @@ bool ConfigSubscriber::checkAvailable() { if (_subscriber.nextGeneration(0)) { - _hasAvailable = true; + _has_available = true; } - return _hasAvailable; + return _has_available; } void @@ -79,88 +82,48 @@ ConfigSubscriber::latch() { if (checkAvailable()) { configure(_handle->getConfig()); - _hasAvailable = false; - } - if (_needToConnect) { - if (_use_logserver) { - connectToLogserver(); - } else { - connectToDevNull(); - } - } -} - -void -ConfigSubscriber::connectToLogserver() -{ - int newfd = makeconn(_logServer.c_str(), _logPort); - if (newfd >= 0) { - resetFileDescriptor(newfd); - LOG(debug, "connected to logserver at %s:%d", _logServer.c_str(), _logPort); - } else { - LOG(debug, "could not connect to %s:%d", _logServer.c_str(), _logPort); - } -} - -void -ConfigSubscriber::connectToDevNull() -{ - int newfd = open("/dev/null", O_RDWR); - if (newfd >= 0) { - resetFileDescriptor(newfd); - LOG(debug, "opened /dev/null for read/write"); - } else { - LOG(debug, "error opening /dev/null (%d): %s", newfd, strerror(newfd)); + _has_available = false; } } -void -ConfigSubscriber::resetFileDescriptor(int newfd) -{ - if (_logserverfd >= 0) { - close(_logserverfd); - } - _logserverfd = newfd; - _fw.setLogserverFD(newfd); - _needToConnect = false; -} - -void -ConfigSubscriber::closeConn() -{ - close(_logserverfd); - _logserverfd = -1; - _needToConnect = true; -} - -ConfigSubscriber::ConfigSubscriber(LegacyForwarder &fw, const config::ConfigUri &configUri) - : _logServer(), - _logPort(0), - _logserverfd(-1), - _statePort(0), +ConfigSubscriber::ConfigSubscriber(const config::ConfigUri& configUri) + : _logserver_host(), + _logserver_port(0), + _state_port(0), + _forward_filter(), _rotate_size(INT_MAX), _rotate_age(INT_MAX), _remove_meg(INT_MAX), _remove_age(3650), _use_logserver(true), - _fw(fw), _subscriber(configUri.getContext()), _handle(), - _hasAvailable(false), - _needToConnect(true) + _has_available(false), + _need_new_forwarder(true) { _handle = _subscriber.subscribe<LogdConfig>(configUri.getConfigId()); _subscriber.nextConfig(0); configure(_handle->getConfig()); - LOG(debug, "got logServer %s", _logServer.c_str()); + LOG(debug, "got logServer %s", _logserver_host.c_str()); LOG(debug, "got handle %p", _handle.get()); } ConfigSubscriber::~ConfigSubscriber() { - LOG(debug, "forget logServer %s", _logServer.c_str()); + LOG(debug, "forget logServer %s", _logserver_host.c_str()); LOG(debug, "done ~ConfSub()"); } +std::unique_ptr<Forwarder> +ConfigSubscriber::make_forwarder(Metrics& metrics) +{ + LegacyForwarder::UP result = _use_logserver ? + LegacyForwarder::to_logserver(metrics, _logserver_host, _logserver_port) : + LegacyForwarder::to_dev_null(metrics); + result->setForwardMap(_forward_filter); + _need_new_forwarder = false; + return result; +} + } diff --git a/logd/src/logd/config_subscriber.h b/logd/src/logd/config_subscriber.h index 1650e46a17a..ab5d11478fb 100644 --- a/logd/src/logd/config_subscriber.h +++ b/logd/src/logd/config_subscriber.h @@ -1,52 +1,49 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "forwarder.h" #include <logd/config-logd.h> #include <vespa/config/config.h> namespace logdemon { -class LegacyForwarder; +class Metrics; /** * Class used to subscribe for logd config. */ class ConfigSubscriber { private: - std::string _logServer; - int _logPort; - int _logserverfd; - int _statePort; + std::string _logserver_host; + int _logserver_port; + int _state_port; + ForwardMap _forward_filter; int _rotate_size; int _rotate_age; int _remove_meg; int _remove_age; bool _use_logserver; - LegacyForwarder& _fw; config::ConfigSubscriber _subscriber; config::ConfigHandle<cloud::config::log::LogdConfig>::UP _handle; - bool _hasAvailable; - bool _needToConnect; + bool _has_available; + bool _need_new_forwarder; - void connectToLogserver(); - void connectToDevNull(); - void resetFileDescriptor(int newfd); public: bool checkAvailable(); void latch(); - void closeConn(); ConfigSubscriber(const ConfigSubscriber& other) = delete; ConfigSubscriber& operator=(const ConfigSubscriber& other) = delete; - ConfigSubscriber(LegacyForwarder &fw, const config::ConfigUri &configUri); + ConfigSubscriber(const config::ConfigUri& configUri); ~ConfigSubscriber(); - int getStatePort() const { return _statePort; } - int getservfd() const { return _logserverfd; } + int getStatePort() const { return _state_port; } int getRotateSize() const { return _rotate_size; } int getRotateAge() const { return _rotate_age; } int getRemoveMegabytes() const { return _remove_meg; } int getRemoveAge() const { return _remove_age; } - bool useLogserver() const { return _use_logserver; } + + bool need_new_forwarder() const { return _need_new_forwarder; } + std::unique_ptr<Forwarder> make_forwarder(Metrics& metrics); void configure(std::unique_ptr<cloud::config::log::LogdConfig> cfg); size_t generation() const { return _subscriber.getGeneration(); } diff --git a/logd/src/logd/forwarder.h b/logd/src/logd/forwarder.h index 93cfdb3de9f..c43a3263bf0 100644 --- a/logd/src/logd/forwarder.h +++ b/logd/src/logd/forwarder.h @@ -4,6 +4,7 @@ #include <vespa/log/log.h> #include <map> +#include <memory> #include <string_view> namespace logdemon { @@ -16,6 +17,7 @@ using ForwardMap = std::map<ns_log::Logger::LogLevel, bool>; */ class Forwarder { public: + using UP = std::unique_ptr<Forwarder>; virtual ~Forwarder() {} virtual void sendMode() = 0; virtual void forwardLine(std::string_view log_line) = 0; diff --git a/logd/src/logd/legacy_forwarder.cpp b/logd/src/logd/legacy_forwarder.cpp index 6b2f430d388..32be2bd72c8 100644 --- a/logd/src/logd/legacy_forwarder.cpp +++ b/logd/src/logd/legacy_forwarder.cpp @@ -1,5 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "conn.h" #include "exceptions.h" #include "legacy_forwarder.h" #include "metrics.h" @@ -7,6 +8,8 @@ #include <vespa/log/exceptions.h> #include <vespa/vespalib/component/vtag.h> #include <vespa/vespalib/locale/c.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <fcntl.h> #include <unistd.h> #include <vespa/log/log.h> @@ -17,21 +20,82 @@ using ns_log::BadLogLineException; using ns_log::LogMessage; using ns_log::Logger; using LogLevel = Logger::LogLevel; +using vespalib::make_string; namespace logdemon { +void +LegacyForwarder::connect_to_logserver(const vespalib::string& logserver_host, int logserver_port) +{ + int new_fd = makeconn(logserver_host.c_str(), logserver_port); + if (new_fd >= 0) { + LOG(debug, "Connected to logserver at %s:%d", logserver_host.c_str(), logserver_port); + _logserver_fd = new_fd; + } else { + auto error_msg = make_string("Could not connect to %s:%d", logserver_host.c_str(), logserver_port); + LOG(debug, "%s", error_msg.c_str()); + throw ConnectionException(error_msg); + } +} + +void +LegacyForwarder::connect_to_dev_null() +{ + int new_fd = open("/dev/null", O_RDWR); + if (new_fd >= 0) { + LOG(debug, "Opened /dev/null for read/write"); + _logserver_fd = new_fd; + } else { + auto error_msg = make_string("Error opening /dev/null (%d): %s", new_fd, strerror(new_fd)); + LOG(debug, "%s", error_msg.c_str()); + throw ConnectionException(error_msg); + } +} + LegacyForwarder::LegacyForwarder(Metrics &metrics) - : _logserverfd(-1), + : _metrics(metrics), + _logserver_fd(-1), _forwardMap(), _badLines(0) -{} -LegacyForwarder::~LegacyForwarder() = default; +{ +} + +LegacyForwarder::UP +LegacyForwarder::to_logserver(Metrics& metrics, const vespalib::string& logserver_host, int logserver_port) +{ + LegacyForwarder::UP result(new LegacyForwarder(metrics)); + result->connect_to_logserver(logserver_host, logserver_port); + return result; +} + +LegacyForwarder::UP +LegacyForwarder::to_dev_null(Metrics& metrics) +{ + LegacyForwarder::UP result(new LegacyForwarder(metrics)); + result->connect_to_dev_null(); + return result; +} + +LegacyForwarder::UP +LegacyForwarder::to_open_file(Metrics& metrics, int file_desc) +{ + LegacyForwarder::UP result(new LegacyForwarder(metrics)); + result->_logserver_fd = file_desc; + return result; +} + +LegacyForwarder::~LegacyForwarder() +{ + if (_logserver_fd >= 0) { + close(_logserver_fd); + } +} void LegacyForwarder::forwardText(const char *text, int len) { - int wsize = write(_logserverfd, text, len); + int wsize = write(_logserver_fd, text, len); if (wsize != len) { if (wsize > 0) { @@ -60,7 +124,7 @@ LegacyForwarder::sendMode() void LegacyForwarder::forwardLine(std::string_view line) { - assert(_logserverfd >= 0); + assert(_logserver_fd >= 0); assert (line.size() > 0); assert (line.size() < 1024*1024); assert (line[line.size() - 1] == '\n'); diff --git a/logd/src/logd/legacy_forwarder.h b/logd/src/logd/legacy_forwarder.h index db3bf84fd4f..bbd7ed840ae 100644 --- a/logd/src/logd/legacy_forwarder.h +++ b/logd/src/logd/legacy_forwarder.h @@ -2,6 +2,8 @@ #pragma once #include "forwarder.h" +#include <vespa/vespalib/stllike/string.h> +#include <memory> namespace logdemon { @@ -12,8 +14,8 @@ struct Metrics; */ class LegacyForwarder : public Forwarder { private: - int _logserverfd; Metrics &_metrics; + int _logserver_fd; ForwardMap _forwardMap; int _badLines; const char *copystr(const char *b, const char *e) { @@ -23,16 +25,23 @@ private: ret[len] = '\0'; return ret; } + void connect_to_logserver(const vespalib::string& logserver_host, int logserver_port); + void connect_to_dev_null(); bool parseLine(std::string_view line); -public: + void forwardText(const char *text, int len); LegacyForwarder(Metrics &metrics); + +public: + using UP = std::unique_ptr<LegacyForwarder>; + static LegacyForwarder::UP to_logserver(Metrics& metrics, const vespalib::string& logserver_host, int logserver_port); + static LegacyForwarder::UP to_dev_null(Metrics& metrics); + static LegacyForwarder::UP to_open_file(Metrics& metrics, int file_desc); ~LegacyForwarder(); - void forwardText(const char *text, int len); + void setForwardMap(const ForwardMap& forwardMap) { _forwardMap = forwardMap; } + + // Implements Forwarder void forwardLine(std::string_view line) override; void flush() override {} - void setForwardMap(const ForwardMap & forwardMap) { _forwardMap = forwardMap; } - void setLogserverFD(int fd) { _logserverfd = fd; } - int getLogserverFD() { return _logserverfd; } void sendMode() override; int badLines() const override { return _badLines; } void resetBadLines() override { _badLines = 0; } diff --git a/logd/src/logd/rpc_forwarder.h b/logd/src/logd/rpc_forwarder.h index 3212da08195..e1e4bc95e06 100644 --- a/logd/src/logd/rpc_forwarder.h +++ b/logd/src/logd/rpc_forwarder.h @@ -26,6 +26,7 @@ private: int _bad_lines; ForwardMap _forward_filter; + public: RpcForwarder(Metrics& metrics, FRT_Supervisor& supervisor, const vespalib::string& logserver_host, int logserver_rpc_port, diff --git a/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp b/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp index c6702e8bc67..83a54a24365 100644 --- a/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp +++ b/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp @@ -12,22 +12,24 @@ using ns_log::Logger; using namespace logdemon; +std::shared_ptr<vespalib::metrics::MetricsManager> dummy = vespalib::metrics::DummyMetricsManager::create(); +Metrics m(dummy); + struct ForwardFixture { - LegacyForwarder &forwarder; + LegacyForwarder::UP forwarder; int fd; const std::string fname; const std::string logLine; - ForwardFixture(LegacyForwarder &fw, const std::string &fileName) - : forwarder(fw), + ForwardFixture(const std::string& fileName) + : forwarder(), fd(-1), fname(fileName), logLine(createLogLine()) { fd = open(fileName.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0777); - forwarder.setLogserverFD(fd); + forwarder = LegacyForwarder::to_open_file(m, fd); } ~ForwardFixture() { - close(fd); } const std::string createLogLine() { @@ -40,7 +42,7 @@ struct ForwardFixture { } void verifyForward(bool doForward) { - forwarder.forwardLine(logLine); + forwarder->forwardLine(logLine); fsync(fd); int rfd = open(fname.c_str(), O_RDONLY); char *buffer[2048]; @@ -51,21 +53,19 @@ struct ForwardFixture { } }; -std::shared_ptr<vespalib::metrics::MetricsManager> dummy = vespalib::metrics::DummyMetricsManager::create(); -Metrics m(dummy); -TEST_FF("require that forwarder forwards if set", LegacyForwarder(m), ForwardFixture(f1, "forward.txt")) { +TEST_F("require that forwarder forwards if set", ForwardFixture("forward.txt")) { ForwardMap forwardMap; forwardMap[Logger::event] = true; - f1.setForwardMap(forwardMap); - f2.verifyForward(true); + f1.forwarder->setForwardMap(forwardMap); + f1.verifyForward(true); } -TEST_FF("require that forwarder does not forward if not set", LegacyForwarder(m), ForwardFixture(f1, "forward.txt")) { +TEST_F("require that forwarder does not forward if not set", ForwardFixture("forward.txt")) { ForwardMap forwardMap; forwardMap[Logger::event] = false; - f1.setForwardMap(forwardMap); - f2.verifyForward(false); + f1.forwarder->setForwardMap(forwardMap); + f1.verifyForward(false); } TEST_MAIN() { TEST_RUN_ALL(); } |