diff options
author | Geir Storli <geirst@verizonmedia.com> | 2019-03-29 10:43:30 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-29 10:43:30 +0100 |
commit | c02161249c161ee65f5a0be018439152b6cc8763 (patch) | |
tree | a50e9c6cb1c235a7994907e5ac6fd58d4ac9d367 /logd | |
parent | a748f744ef4f14bac32c9c62e71512162325e3b9 (diff) | |
parent | 24e3f9a0da95f11d0603a727e1ea0c860422de2e (diff) |
Merge pull request #8937 from vespa-engine/geirst/logd-implement-rpc-forwarder
Geirst/logd implement rpc forwarder
Diffstat (limited to 'logd')
-rw-r--r-- | logd/CMakeLists.txt | 1 | ||||
-rw-r--r-- | logd/src/logd/CMakeLists.txt | 1 | ||||
-rw-r--r-- | logd/src/logd/exceptions.h | 7 | ||||
-rw-r--r-- | logd/src/logd/forwarder.h | 10 | ||||
-rw-r--r-- | logd/src/logd/legacy_forwarder.cpp | 18 | ||||
-rw-r--r-- | logd/src/logd/legacy_forwarder.h | 11 | ||||
-rw-r--r-- | logd/src/logd/proto_converter.h | 3 | ||||
-rw-r--r-- | logd/src/logd/rpc_forwarder.cpp | 151 | ||||
-rw-r--r-- | logd/src/logd/rpc_forwarder.h | 45 | ||||
-rw-r--r-- | logd/src/logd/watcher.cpp | 2 | ||||
-rw-r--r-- | logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp | 5 | ||||
-rw-r--r-- | logd/src/tests/rpc_forwarder/CMakeLists.txt | 9 | ||||
-rw-r--r-- | logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp | 248 |
13 files changed, 487 insertions, 24 deletions
diff --git a/logd/CMakeLists.txt b/logd/CMakeLists.txt index 9a5fdf32841..e16d3bd5179 100644 --- a/logd/CMakeLists.txt +++ b/logd/CMakeLists.txt @@ -17,6 +17,7 @@ vespa_define_module( src/tests/legacy_forwarder src/tests/proto_converter src/tests/rotate + src/tests/rpc_forwarder ) vespa_install_script(src/apps/retention/retention-enforcer.sh vespa-retention-enforcer sbin) diff --git a/logd/src/logd/CMakeLists.txt b/logd/src/logd/CMakeLists.txt index baf52f1d5d8..629c7b1637a 100644 --- a/logd/src/logd/CMakeLists.txt +++ b/logd/src/logd/CMakeLists.txt @@ -16,6 +16,7 @@ vespa_add_library(logd STATIC legacy_forwarder.cpp metrics.cpp proto_converter.cpp + rpc_forwarder.cpp state_reporter.cpp watcher.cpp ${logd_PROTOBUF_SRCS} diff --git a/logd/src/logd/exceptions.h b/logd/src/logd/exceptions.h index b2ff3516b69..82e8b570c3b 100644 --- a/logd/src/logd/exceptions.h +++ b/logd/src/logd/exceptions.h @@ -18,7 +18,12 @@ public: class ConnectionException : public MsgException { public: - ConnectionException(const char *s) : MsgException(s) {} + ConnectionException(const std::string& msg) : MsgException(msg) {} +}; + +class DecodeException : public MsgException { +public: + DecodeException(const std::string& msg) : MsgException(msg) {} }; class SigTermException : public MsgException diff --git a/logd/src/logd/forwarder.h b/logd/src/logd/forwarder.h index a0a1c5f1ea5..93cfdb3de9f 100644 --- a/logd/src/logd/forwarder.h +++ b/logd/src/logd/forwarder.h @@ -2,8 +2,15 @@ #pragma once +#include <vespa/log/log.h> +#include <map> +#include <string_view> + namespace logdemon { +// Mapping saying if a level should be forwarded or not +using ForwardMap = std::map<ns_log::Logger::LogLevel, bool>; + /** * Interface used to forward log lines to something. */ @@ -11,7 +18,8 @@ class Forwarder { public: virtual ~Forwarder() {} virtual void sendMode() = 0; - virtual void forwardLine(const char *line, const char *eol) = 0; + virtual void forwardLine(std::string_view log_line) = 0; + virtual void flush() = 0; virtual int badLines() const = 0; virtual void resetBadLines() = 0; }; diff --git a/logd/src/logd/legacy_forwarder.cpp b/logd/src/logd/legacy_forwarder.cpp index b8b93a03530..6b2f430d388 100644 --- a/logd/src/logd/legacy_forwarder.cpp +++ b/logd/src/logd/legacy_forwarder.cpp @@ -58,26 +58,24 @@ LegacyForwarder::sendMode() } void -LegacyForwarder::forwardLine(const char *line, const char *eol) +LegacyForwarder::forwardLine(std::string_view line) { - int linelen = eol - line; - assert(_logserverfd >= 0); - assert (linelen > 0); - assert (linelen < 1024*1024); - assert (line[linelen - 1] == '\n'); + assert (line.size() > 0); + assert (line.size() < 1024*1024); + assert (line[line.size() - 1] == '\n'); - if (parseline(line, eol)) { - forwardText(line, linelen); + if (parseLine(line)) { + forwardText(line.data(), line.size()); } } bool -LegacyForwarder::parseline(const char *linestart, const char *lineend) +LegacyForwarder::parseLine(std::string_view line) { LogMessage message; try { - message.parse_log_line(std::string_view(linestart, lineend - linestart)); + message.parse_log_line(line); } catch (BadLogLineException &e) { LOG(spam, "bad logline: %s", e.what()); ++_badLines; diff --git a/logd/src/logd/legacy_forwarder.h b/logd/src/logd/legacy_forwarder.h index 81a93ce1d50..db3bf84fd4f 100644 --- a/logd/src/logd/legacy_forwarder.h +++ b/logd/src/logd/legacy_forwarder.h @@ -2,15 +2,9 @@ #pragma once #include "forwarder.h" -#include <vespa/log/log.h> -#include <map> -#include <unordered_set> namespace logdemon { -// Mapping saying if a level should be forwarded or not -using ForwardMap = std::map<ns_log::Logger::LogLevel, bool>; - struct Metrics; /** @@ -29,12 +23,13 @@ private: ret[len] = '\0'; return ret; } - bool parseline(const char *linestart, const char *lineend); + bool parseLine(std::string_view line); public: LegacyForwarder(Metrics &metrics); ~LegacyForwarder(); void forwardText(const char *text, int len); - void forwardLine(const char *line, const char *eol) override; + void forwardLine(std::string_view line) override; + void flush() override {} void setForwardMap(const ForwardMap & forwardMap) { _forwardMap = forwardMap; } void setLogserverFD(int fd) { _logserverfd = fd; } int getLogserverFD() { return _logserverfd; } diff --git a/logd/src/logd/proto_converter.h b/logd/src/logd/proto_converter.h index 688648b99de..88749100736 100644 --- a/logd/src/logd/proto_converter.h +++ b/logd/src/logd/proto_converter.h @@ -1,5 +1,7 @@ // Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + #include "log_protocol_proto.h" #include <vespa/log/log_message.h> #include <vector> @@ -11,6 +13,7 @@ namespace logdemon { */ struct ProtoConverter { using ProtoLogRequest = logserver::protocol::protobuf::LogRequest; + using ProtoLogResponse = logserver::protocol::protobuf::LogResponse; using ProtoLogMessage = logserver::protocol::protobuf::LogMessage; static void log_messages_to_proto(const std::vector<ns_log::LogMessage>& messages, ProtoLogRequest& proto); diff --git a/logd/src/logd/rpc_forwarder.cpp b/logd/src/logd/rpc_forwarder.cpp new file mode 100644 index 00000000000..e515f463db4 --- /dev/null +++ b/logd/src/logd/rpc_forwarder.cpp @@ -0,0 +1,151 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "exceptions.h" +#include "metrics.h" +#include "proto_converter.h" +#include "rpc_forwarder.h" +#include <vespa/log/exceptions.h> +#include <vespa/vespalib/util/buffer.h> +#include <vespa/vespalib/util/stringfmt.h> + +#include <vespa/log/log.h> +LOG_SETUP(".logd.rpc_forwarder"); + +using ns_log::BadLogLineException; +using ns_log::LogMessage; +using vespalib::make_string; + +namespace logdemon { + +RpcForwarder::RpcForwarder(Metrics& metrics, FRT_Supervisor& supervisor, + const vespalib::string &hostname, int rpc_port, + double rpc_timeout_secs, size_t max_messages_per_request) + : _metrics(metrics), + _connection_spec(make_string("tcp/%s:%d", hostname.c_str(), rpc_port)), + _rpc_timeout_secs(rpc_timeout_secs), + _max_messages_per_request(max_messages_per_request), + _target(), + _messages(), + _bad_lines(0), + _forward_filter() +{ + _target = supervisor.GetTarget(_connection_spec.c_str()); +} + +RpcForwarder::~RpcForwarder() +{ + _target->SubRef(); +} + +namespace { + +void +encode_log_request(const ProtoConverter::ProtoLogRequest& src, FRT_RPCRequest& dst) +{ + dst.SetMethodName("vespa.logserver.archiveLogMessages"); + auto buf = src.SerializeAsString(); + auto& params = *dst.GetParams(); + params.AddInt8(0); // '0' indicates no compression + params.AddInt32(buf.size()); + params.AddData(buf.data(), buf.size()); +} + +bool +decode_log_response(FRT_RPCRequest& src, ProtoConverter::ProtoLogResponse& dst) +{ + auto& values = *src.GetReturn(); + uint8_t encoding = values[0]._intval8; + assert(encoding == 0); // Not using compression + uint32_t uncompressed_size = values[1]._intval32; + (void) uncompressed_size; + return dst.ParseFromArray(values[2]._data._buf, values[2]._data._len); +} + +bool +should_forward_log_message(const LogMessage& message, const ForwardMap& filter) +{ + auto found = filter.find(message.level()); + if (found != filter.end()) { + return found->second; + } + return false; +} + +} + +void +RpcForwarder::forwardLine(std::string_view line) +{ + LogMessage message; + try { + message.parse_log_line(line); + } catch (BadLogLineException &e) { + LOG(spam, "Skipping bad logline: %s", e.what()); + ++_bad_lines; + return; + } + _metrics.countLine(ns_log::Logger::logLevelNames[message.level()], message.service()); + if (should_forward_log_message(message, _forward_filter)) { + _messages.push_back(std::move(message)); + if (_messages.size() == _max_messages_per_request) { + flush(); + } + } +} + +namespace { + +class GuardedRequest { +private: + FRT_RPCRequest* _request; +public: + GuardedRequest() + : _request(new FRT_RPCRequest()) + {} + ~GuardedRequest() { + _request->SubRef(); + } + FRT_RPCRequest& operator*() const { return *_request; } + FRT_RPCRequest* get() const { return _request; } + FRT_RPCRequest* operator->() const { return get(); } +}; + +} + +void +RpcForwarder::flush() +{ + if (_messages.empty()) { + return; + } + ProtoConverter::ProtoLogRequest proto_request; + ProtoConverter::log_messages_to_proto(_messages, proto_request); + GuardedRequest request; + encode_log_request(proto_request, *request); + _target->InvokeSync(request.get(), _rpc_timeout_secs); + if (!request->CheckReturnTypes("bix")) { + auto error_msg = make_string("Error in rpc reply from '%s': '%s'", + _connection_spec.c_str(), request->GetErrorMessage()); + throw ConnectionException(error_msg); + } + ProtoConverter::ProtoLogResponse proto_response; + if (!decode_log_response(*request, proto_response)) { + auto error_msg = make_string("Error during decoding of protobuf response from '%s'", _connection_spec.c_str()); + throw DecodeException(error_msg); + } + _messages.clear(); +} + +int +RpcForwarder::badLines() const +{ + return _bad_lines; +} + +void +RpcForwarder::resetBadLines() +{ + _bad_lines = 0; +} + +} diff --git a/logd/src/logd/rpc_forwarder.h b/logd/src/logd/rpc_forwarder.h new file mode 100644 index 00000000000..3212da08195 --- /dev/null +++ b/logd/src/logd/rpc_forwarder.h @@ -0,0 +1,45 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "forwarder.h" +#include "proto_converter.h" +#include <vespa/log/log_message.h> +#include <vespa/fnet/frt/frt.h> +#include <vector> + +namespace logdemon { + +struct Metrics; + +/** + * Implementation of the Forwarder interface that uses RPC to send protobuf encoded log messages to the logserver. + */ +class RpcForwarder : public Forwarder { +private: + Metrics& _metrics; + vespalib::string _connection_spec; + double _rpc_timeout_secs; + size_t _max_messages_per_request; + FRT_Target* _target; + std::vector<ns_log::LogMessage> _messages; + int _bad_lines; + ForwardMap _forward_filter; + +public: + RpcForwarder(Metrics& metrics, FRT_Supervisor& supervisor, + const vespalib::string& logserver_host, int logserver_rpc_port, + double rpc_timeout_secs, size_t max_messages_per_request); + ~RpcForwarder() override; + void set_forward_filter(const ForwardMap& forward_filter) { _forward_filter = forward_filter; } + + // Implements Forwarder + void sendMode() override {} + void forwardLine(std::string_view line) override; + void flush() override; + int badLines() const override; + void resetBadLines() override; +}; + +} + diff --git a/logd/src/logd/watcher.cpp b/logd/src/logd/watcher.cpp index a047c110f32..c505d2dd235 100644 --- a/logd/src/logd/watcher.cpp +++ b/logd/src/logd/watcher.cpp @@ -220,7 +220,7 @@ Watcher::watchfile() } while (nnl != nullptr && elapsed(tickStart) < 1) { ++nnl; - _forwarder.forwardLine(l, nnl); + _forwarder.forwardLine(std::string_view(l, (nnl - l))); ssize_t wsize = nnl - l; offset += wsize; l = nnl; diff --git a/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp b/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp index 3af35f9aa09..c6702e8bc67 100644 --- a/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp +++ b/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp @@ -40,13 +40,12 @@ struct ForwardFixture { } void verifyForward(bool doForward) { - const std::string & line(logLine); - forwarder.forwardLine(line.c_str(), line.c_str() + line.length()); + forwarder.forwardLine(logLine); fsync(fd); int rfd = open(fname.c_str(), O_RDONLY); char *buffer[2048]; ssize_t bytes = read(rfd, buffer, 2048); - ssize_t expected = doForward ? line.length() : 0; + ssize_t expected = doForward ? logLine.length() : 0; EXPECT_EQUAL(expected, bytes); close(rfd); } diff --git a/logd/src/tests/rpc_forwarder/CMakeLists.txt b/logd/src/tests/rpc_forwarder/CMakeLists.txt new file mode 100644 index 00000000000..66a30777b41 --- /dev/null +++ b/logd/src/tests/rpc_forwarder/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(logd_rpc_forwarder_test_app TEST + SOURCES + rpc_forwarder_test.cpp + DEPENDS + logd + gtest +) +vespa_add_test(NAME logd_rpc_forwarder_test_app COMMAND logd_rpc_forwarder_test_app) diff --git a/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp new file mode 100644 index 00000000000..30ca5e19d44 --- /dev/null +++ b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp @@ -0,0 +1,248 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <logd/exceptions.h> +#include <logd/metrics.h> +#include <logd/rpc_forwarder.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/metrics/dummy_metrics_manager.h> + +using namespace logdemon; +using vespalib::metrics::DummyMetricsManager; + +void +encode_log_response(const ProtoConverter::ProtoLogResponse& src, FRT_Values& dst) +{ + auto buf = src.SerializeAsString(); + dst.AddInt8(0); + dst.AddInt32(buf.size()); + dst.AddData(buf.data(), buf.size()); +} + +bool +decode_log_request(FRT_Values& src, ProtoConverter::ProtoLogRequest& dst) +{ + uint8_t encoding = src[0]._intval8; + assert(encoding == 0); + uint32_t uncompressed_size = src[1]._intval32; + assert(uncompressed_size == src[2]._data._len); + return dst.ParseFromArray(src[2]._data._buf, src[2]._data._len); +} + +std::string garbage("garbage"); + +struct RpcServer : public FRT_Invokable { + FRT_Supervisor supervisor; + int request_count; + std::vector<std::string> messages; + bool reply_with_error; + bool reply_with_proto_response; + +public: + RpcServer() + : supervisor(), + request_count(0), + messages(), + reply_with_error(false), + reply_with_proto_response(true) + { + supervisor.Listen(0); + FRT_ReflectionBuilder builder(&supervisor); + builder.DefineMethod("vespa.logserver.archiveLogMessages", "bix", "bix", + FRT_METHOD(RpcServer::rpc_archive_log_messages), this); + supervisor.Start(); + } + ~RpcServer() { + supervisor.ShutDown(true); + } + int get_listen_port() { + return supervisor.GetListenPort(); + } + void rpc_archive_log_messages(FRT_RPCRequest* request) { + ProtoConverter::ProtoLogRequest proto_request; + ASSERT_TRUE(decode_log_request(*request->GetParams(), proto_request)); + ++request_count; + for (const auto& message : proto_request.log_messages()) { + messages.push_back(message.payload()); + } + if (reply_with_error) { + request->SetError(123, "This is a server error"); + return; + } + if (reply_with_proto_response) { + ProtoConverter::ProtoLogResponse proto_response; + encode_log_response(proto_response, *request->GetReturn()); + } else { + auto& dst = *request->GetReturn(); + dst.AddInt8(0); + dst.AddInt32(garbage.size()); + dst.AddData(garbage.data(), garbage.size()); + } + } +}; + +std::string +make_log_line(const std::string& level, const std::string& payload) +{ + return "1234.5678\tmy_host\t10/20\tmy_service\tmy_component\t" + level + "\t" + payload; +} + +struct MockMetricsManager : public DummyMetricsManager { + int add_count; + MockMetricsManager() : DummyMetricsManager(), add_count(0) {} + void add(Counter::Increment) override { + ++add_count; + } +}; + +class ClientSupervisor { +private: + FRT_Supervisor _supervisor; +public: + ClientSupervisor() + : _supervisor() + { + _supervisor.Start(); + } + ~ClientSupervisor() { + _supervisor.ShutDown(true); + } + FRT_Supervisor& get() { return _supervisor; } + +}; + +struct RpcForwarderTest : public ::testing::Test { + RpcServer server; + std::shared_ptr<MockMetricsManager> metrics_mgr; + Metrics metrics; + ClientSupervisor supervisor; + RpcForwarder forwarder; + RpcForwarderTest() + : server(), + metrics_mgr(std::make_shared<MockMetricsManager>()), + metrics(metrics_mgr), + forwarder(metrics, supervisor.get(), "localhost", server.get_listen_port(), 60.0, 3) + { + ForwardMap forward_filter; + forward_filter[ns_log::Logger::error] = true; + forward_filter[ns_log::Logger::warning] = false; + forward_filter[ns_log::Logger::info] = true; + // all other log levels are implicit false + forwarder.set_forward_filter(forward_filter); + } + void forward_line(const std::string& payload) { + forwarder.forwardLine(make_log_line("info", payload)); + } + void forward_line(const std::string& level, const std::string& payload) { + forwarder.forwardLine(make_log_line(level, payload)); + } + void forward_bad_line() { + forwarder.forwardLine("badline"); + } + void flush() { + forwarder.flush(); + } + void expect_messages() { + expect_messages(0, {}); + } + void expect_messages(int exp_request_count, const std::vector<std::string>& exp_messages) { + EXPECT_EQ(exp_request_count, server.request_count); + EXPECT_EQ(exp_messages, server.messages); + } +}; + +TEST_F(RpcForwarderTest, does_not_send_rpc_with_no_log_messages) +{ + expect_messages(); + flush(); + expect_messages(); +} + +TEST_F(RpcForwarderTest, can_send_rpc_with_single_log_message) +{ + forward_line("a"); + expect_messages(); + flush(); + expect_messages(1, {"a"}); +} + +TEST_F(RpcForwarderTest, can_send_rpc_with_multiple_log_messages) +{ + forward_line("a"); + forward_line("b"); + expect_messages(); + flush(); + expect_messages(1, {"a", "b"}); +} + +TEST_F(RpcForwarderTest, automatically_sends_rpc_when_max_messages_limit_is_reached) +{ + forward_line("a"); + forward_line("b"); + expect_messages(); + forward_line("c"); + expect_messages(1, {"a", "b", "c"}); + forward_line("d"); + expect_messages(1, {"a", "b", "c"}); + forward_line("e"); + expect_messages(1, {"a", "b", "c"}); + forward_line("f"); + expect_messages(2, {"a", "b", "c", "d", "e", "f"}); +} + +TEST_F(RpcForwarderTest, bad_log_lines_are_counted_but_not_sent) +{ + forward_line("a"); + forward_bad_line(); + EXPECT_EQ(1, forwarder.badLines()); + flush(); + expect_messages(1, {"a"}); +} + +TEST_F(RpcForwarderTest, bad_log_lines_count_can_be_reset) +{ + forward_bad_line(); + EXPECT_EQ(1, forwarder.badLines()); + forwarder.resetBadLines(); + EXPECT_EQ(0, forwarder.badLines()); +} + +TEST_F(RpcForwarderTest, metrics_are_updated_for_each_log_message) +{ + forward_line("a"); + EXPECT_EQ(1, metrics_mgr->add_count); + forward_line("b"); + EXPECT_EQ(2, metrics_mgr->add_count); +} + +TEST_F(RpcForwarderTest, log_messages_are_filtered_on_log_level) +{ + forward_line("fatal", "a"); + forward_line("error", "b"); + forward_line("warning", "c"); + forward_line("config", "d"); + forward_line("info", "e"); + forward_line("event", "f"); + forward_line("debug", "g"); + forward_line("spam", "h"); + forward_line("null", "i"); + flush(); + expect_messages(1, {"b", "e"}); + EXPECT_EQ(9, metrics_mgr->add_count); +} + +TEST_F(RpcForwarderTest, throws_when_rpc_reply_contains_errors) +{ + server.reply_with_error = true; + forward_line("a"); + EXPECT_THROW(flush(), logdemon::ConnectionException); +} + +TEST_F(RpcForwarderTest, throws_when_rpc_reply_does_not_contain_proto_response) +{ + server.reply_with_proto_response = false; + forward_line("a"); + EXPECT_THROW(flush(), logdemon::DecodeException); +} + +GTEST_MAIN_RUN_ALL_TESTS() + |