diff options
author | Geir Storli <geirst@verizonmedia.com> | 2019-03-28 09:43:40 +0000 |
---|---|---|
committer | Geir Storli <geirst@verizonmedia.com> | 2019-03-28 10:03:20 +0000 |
commit | 5d22214d38aad75b0362cd64283394a0c02fb12f (patch) | |
tree | ce52a6c86aae4333cb848fff7ff2eb653ef0f9c0 /logd/src | |
parent | 949d1664e62bf29cb67193274a88991b0157b070 (diff) |
Implement initial version of forwarder that uses RPC to send protobuf encoded log messages to the logserver.
Diffstat (limited to 'logd/src')
-rw-r--r-- | logd/src/logd/CMakeLists.txt | 1 | ||||
-rw-r--r-- | logd/src/logd/exceptions.h | 7 | ||||
-rw-r--r-- | logd/src/logd/forwarder.h | 5 | ||||
-rw-r--r-- | logd/src/logd/legacy_forwarder.cpp | 18 | ||||
-rw-r--r-- | logd/src/logd/legacy_forwarder.h | 5 | ||||
-rw-r--r-- | logd/src/logd/proto_converter.h | 3 | ||||
-rw-r--r-- | logd/src/logd/rpc_forwarder.cpp | 118 | ||||
-rw-r--r-- | logd/src/logd/rpc_forwarder.h | 39 | ||||
-rw-r--r-- | logd/src/logd/watcher.cpp | 2 | ||||
-rw-r--r-- | logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp | 5 | ||||
-rw-r--r-- | logd/src/tests/rpc_forwarder/CMakeLists.txt | 9 | ||||
-rw-r--r-- | logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp | 162 |
12 files changed, 356 insertions, 18 deletions
diff --git a/logd/src/logd/CMakeLists.txt b/logd/src/logd/CMakeLists.txt index baf52f1d5d8..629c7b1637a 100644 --- a/logd/src/logd/CMakeLists.txt +++ b/logd/src/logd/CMakeLists.txt @@ -16,6 +16,7 @@ vespa_add_library(logd STATIC legacy_forwarder.cpp metrics.cpp proto_converter.cpp + rpc_forwarder.cpp state_reporter.cpp watcher.cpp ${logd_PROTOBUF_SRCS} diff --git a/logd/src/logd/exceptions.h b/logd/src/logd/exceptions.h index b2ff3516b69..82e8b570c3b 100644 --- a/logd/src/logd/exceptions.h +++ b/logd/src/logd/exceptions.h @@ -18,7 +18,12 @@ public: class ConnectionException : public MsgException { public: - ConnectionException(const char *s) : MsgException(s) {} + ConnectionException(const std::string& msg) : MsgException(msg) {} +}; + +class DecodeException : public MsgException { +public: + DecodeException(const std::string& msg) : MsgException(msg) {} }; class SigTermException : public MsgException diff --git a/logd/src/logd/forwarder.h b/logd/src/logd/forwarder.h index a0a1c5f1ea5..b26abf01658 100644 --- a/logd/src/logd/forwarder.h +++ b/logd/src/logd/forwarder.h @@ -2,6 +2,8 @@ #pragma once +#include <string_view> + namespace logdemon { /** @@ -11,7 +13,8 @@ class Forwarder { public: virtual ~Forwarder() {} virtual void sendMode() = 0; - virtual void forwardLine(const char *line, const char *eol) = 0; + virtual void forwardLine(std::string_view log_line) = 0; + virtual void flush() = 0; virtual int badLines() const = 0; virtual void resetBadLines() = 0; }; diff --git a/logd/src/logd/legacy_forwarder.cpp b/logd/src/logd/legacy_forwarder.cpp index b8b93a03530..6b2f430d388 100644 --- a/logd/src/logd/legacy_forwarder.cpp +++ b/logd/src/logd/legacy_forwarder.cpp @@ -58,26 +58,24 @@ LegacyForwarder::sendMode() } void -LegacyForwarder::forwardLine(const char *line, const char *eol) +LegacyForwarder::forwardLine(std::string_view line) { - int linelen = eol - line; - assert(_logserverfd >= 0); - assert (linelen > 0); - assert (linelen < 1024*1024); - assert (line[linelen - 1] == '\n'); + assert (line.size() > 0); + assert (line.size() < 1024*1024); + assert (line[line.size() - 1] == '\n'); - if (parseline(line, eol)) { - forwardText(line, linelen); + if (parseLine(line)) { + forwardText(line.data(), line.size()); } } bool -LegacyForwarder::parseline(const char *linestart, const char *lineend) +LegacyForwarder::parseLine(std::string_view line) { LogMessage message; try { - message.parse_log_line(std::string_view(linestart, lineend - linestart)); + message.parse_log_line(line); } catch (BadLogLineException &e) { LOG(spam, "bad logline: %s", e.what()); ++_badLines; diff --git a/logd/src/logd/legacy_forwarder.h b/logd/src/logd/legacy_forwarder.h index 81a93ce1d50..7adeabef29e 100644 --- a/logd/src/logd/legacy_forwarder.h +++ b/logd/src/logd/legacy_forwarder.h @@ -29,12 +29,13 @@ private: ret[len] = '\0'; return ret; } - bool parseline(const char *linestart, const char *lineend); + bool parseLine(std::string_view line); public: LegacyForwarder(Metrics &metrics); ~LegacyForwarder(); void forwardText(const char *text, int len); - void forwardLine(const char *line, const char *eol) override; + void forwardLine(std::string_view line) override; + void flush() override {} void setForwardMap(const ForwardMap & forwardMap) { _forwardMap = forwardMap; } void setLogserverFD(int fd) { _logserverfd = fd; } int getLogserverFD() { return _logserverfd; } diff --git a/logd/src/logd/proto_converter.h b/logd/src/logd/proto_converter.h index 688648b99de..88749100736 100644 --- a/logd/src/logd/proto_converter.h +++ b/logd/src/logd/proto_converter.h @@ -1,5 +1,7 @@ // Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + #include "log_protocol_proto.h" #include <vespa/log/log_message.h> #include <vector> @@ -11,6 +13,7 @@ namespace logdemon { */ struct ProtoConverter { using ProtoLogRequest = logserver::protocol::protobuf::LogRequest; + using ProtoLogResponse = logserver::protocol::protobuf::LogResponse; using ProtoLogMessage = logserver::protocol::protobuf::LogMessage; static void log_messages_to_proto(const std::vector<ns_log::LogMessage>& messages, ProtoLogRequest& proto); diff --git a/logd/src/logd/rpc_forwarder.cpp b/logd/src/logd/rpc_forwarder.cpp new file mode 100644 index 00000000000..37436c713bd --- /dev/null +++ b/logd/src/logd/rpc_forwarder.cpp @@ -0,0 +1,118 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "exceptions.h" +#include "proto_converter.h" +#include "rpc_forwarder.h" +#include <vespa/log/exceptions.h> +#include <vespa/vespalib/util/buffer.h> +#include <vespa/vespalib/util/stringfmt.h> + +#include <vespa/log/log.h> +LOG_SETUP(".logd.rpc_forwarder"); + +using ns_log::BadLogLineException; +using ns_log::LogMessage; +using vespalib::make_string; + +namespace logdemon { + +RpcForwarder::RpcForwarder(const vespalib::string &hostname, int rpc_port, + double rpc_timeout_secs, size_t max_messages_per_request) + : _connection_spec(make_string("tcp/%s:%d", hostname.c_str(), rpc_port)), + _rpc_timeout_secs(rpc_timeout_secs), + _max_messages_per_request(max_messages_per_request), + _supervisor(), + _target(), + _messages() +{ + _supervisor.Start(); + _target = _supervisor.GetTarget(_connection_spec.c_str()); +} + +RpcForwarder::~RpcForwarder() +{ + _target->SubRef(); + _supervisor.ShutDown(true); +} + +namespace { + +void +encode_log_request(const ProtoConverter::ProtoLogRequest& src, FRT_RPCRequest& dst) +{ + dst.SetMethodName("vespa.logserver.archiveLogMessages"); + auto buf = src.SerializeAsString(); + auto& params = *dst.GetParams(); + params.AddInt8(0); // '0' indicates no compression + params.AddInt32(buf.size()); + params.AddData(buf.data(), buf.size()); +} + +bool +decode_log_response(FRT_RPCRequest& src, ProtoConverter::ProtoLogResponse& dst) +{ + auto& values = *src.GetReturn(); + uint8_t encoding = values[0]._intval8; + assert(encoding == 0); // Not using compression + uint32_t uncompressed_size = values[1]._intval32; + (void) uncompressed_size; + return dst.ParseFromArray(values[2]._data._buf, values[2]._data._len); +} + +} + +void +RpcForwarder::forwardLine(std::string_view line) +{ + LogMessage message; + try { + message.parse_log_line(line); + } catch (BadLogLineException &e) { + LOG(spam, "Skipping bad logline: %s", e.what()); + return; + } + _messages.push_back(std::move(message)); + if (_messages.size() == _max_messages_per_request) { + flush(); + } +} + +void +RpcForwarder::flush() +{ + if (_messages.empty()) { + return; + } + ProtoConverter::ProtoLogRequest proto_request; + ProtoConverter::log_messages_to_proto(_messages, proto_request); + auto request = new FRT_RPCRequest(); + encode_log_request(proto_request, *request); + _target->InvokeSync(request, _rpc_timeout_secs); + if (!request->CheckReturnTypes("bix")) { + auto error_msg = make_string("Error in rpc reply from '%s': '%s'", + _connection_spec.c_str(), request->GetErrorMessage()); + request->SubRef(); + throw ConnectionException(error_msg); + } + ProtoConverter::ProtoLogResponse proto_response; + if (!decode_log_response(*request, proto_response)) { + auto error_msg = make_string("Error during decoding of protobuf response from '%s'", _connection_spec.c_str()); + request->SubRef(); + throw DecodeException(error_msg); + } + request->SubRef(); + _messages.clear(); +} + +int +RpcForwarder::badLines() const +{ + return 0; +} + +void +RpcForwarder::resetBadLines() +{ +} + +} diff --git a/logd/src/logd/rpc_forwarder.h b/logd/src/logd/rpc_forwarder.h new file mode 100644 index 00000000000..65ac0b0af44 --- /dev/null +++ b/logd/src/logd/rpc_forwarder.h @@ -0,0 +1,39 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "forwarder.h" +#include "proto_converter.h" +#include <vespa/log/log_message.h> +#include <vespa/fnet/frt/frt.h> +#include <vector> + +namespace logdemon { + +/** + * Implementation of the Forwarder interface that uses RPC to send protobuf encoded log messages to the logserver. + */ +class RpcForwarder : public Forwarder { +private: + vespalib::string _connection_spec; + double _rpc_timeout_secs; + size_t _max_messages_per_request; + FRT_Supervisor _supervisor; + FRT_Target* _target; + std::vector<ns_log::LogMessage> _messages; + +public: + RpcForwarder(const vespalib::string& logserver_host, int logserver_rpc_port, + double rpc_timeout_secs, size_t max_messages_per_request); + ~RpcForwarder() override; + + // Implements Forwarder + void sendMode() override {} + void forwardLine(std::string_view line) override; + void flush() override; + int badLines() const override; + void resetBadLines() override; +}; + +} + diff --git a/logd/src/logd/watcher.cpp b/logd/src/logd/watcher.cpp index a047c110f32..c505d2dd235 100644 --- a/logd/src/logd/watcher.cpp +++ b/logd/src/logd/watcher.cpp @@ -220,7 +220,7 @@ Watcher::watchfile() } while (nnl != nullptr && elapsed(tickStart) < 1) { ++nnl; - _forwarder.forwardLine(l, nnl); + _forwarder.forwardLine(std::string_view(l, (nnl - l))); ssize_t wsize = nnl - l; offset += wsize; l = nnl; diff --git a/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp b/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp index 3af35f9aa09..c6702e8bc67 100644 --- a/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp +++ b/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp @@ -40,13 +40,12 @@ struct ForwardFixture { } void verifyForward(bool doForward) { - const std::string & line(logLine); - forwarder.forwardLine(line.c_str(), line.c_str() + line.length()); + forwarder.forwardLine(logLine); fsync(fd); int rfd = open(fname.c_str(), O_RDONLY); char *buffer[2048]; ssize_t bytes = read(rfd, buffer, 2048); - ssize_t expected = doForward ? line.length() : 0; + ssize_t expected = doForward ? logLine.length() : 0; EXPECT_EQUAL(expected, bytes); close(rfd); } diff --git a/logd/src/tests/rpc_forwarder/CMakeLists.txt b/logd/src/tests/rpc_forwarder/CMakeLists.txt new file mode 100644 index 00000000000..66a30777b41 --- /dev/null +++ b/logd/src/tests/rpc_forwarder/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(logd_rpc_forwarder_test_app TEST + SOURCES + rpc_forwarder_test.cpp + DEPENDS + logd + gtest +) +vespa_add_test(NAME logd_rpc_forwarder_test_app COMMAND logd_rpc_forwarder_test_app) diff --git a/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp new file mode 100644 index 00000000000..183132bb39d --- /dev/null +++ b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp @@ -0,0 +1,162 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <logd/exceptions.h> +#include <logd/rpc_forwarder.h> +#include <vespa/vespalib/gtest/gtest.h> + +using namespace logdemon; + +void +encode_log_response(const ProtoConverter::ProtoLogResponse& src, FRT_Values& dst) +{ + auto buf = src.SerializeAsString(); + dst.AddInt8(0); + dst.AddInt32(buf.size()); + dst.AddData(buf.data(), buf.size()); +} + +bool +decode_log_request(FRT_Values& src, ProtoConverter::ProtoLogRequest& dst) +{ + uint8_t encoding = src[0]._intval8; + assert(encoding == 0); + uint32_t uncompressed_size = src[1]._intval32; + assert(uncompressed_size == src[2]._data._len); + return dst.ParseFromArray(src[2]._data._buf, src[2]._data._len); +} + +std::string garbage("garbage"); + +struct RpcServer : public FRT_Invokable { + FRT_Supervisor supervisor; + int request_count; + std::vector<std::string> messages; + bool reply_with_error; + bool reply_with_proto_response; + +public: + RpcServer() + : supervisor(), + request_count(0), + messages(), + reply_with_error(false), + reply_with_proto_response(true) + { + supervisor.Listen(0); + supervisor.Start(); + FRT_ReflectionBuilder builder(&supervisor); + builder.DefineMethod("vespa.logserver.archiveLogMessages", "bix", "bix", + FRT_METHOD(RpcServer::rpc_archive_log_messages), this); + } + ~RpcServer() { + supervisor.ShutDown(true); + } + int get_listen_port() { + return supervisor.GetListenPort(); + } + void rpc_archive_log_messages(FRT_RPCRequest* request) { + ProtoConverter::ProtoLogRequest proto_request; + ASSERT_TRUE(decode_log_request(*request->GetParams(), proto_request)); + ++request_count; + for (const auto& message : proto_request.log_messages()) { + messages.push_back(message.payload()); + } + if (reply_with_error) { + request->SetError(123, "This is a server error"); + return; + } + if (reply_with_proto_response) { + ProtoConverter::ProtoLogResponse proto_response; + encode_log_response(proto_response, *request->GetReturn()); + } else { + auto& dst = *request->GetReturn(); + dst.AddInt8(0); + dst.AddInt32(garbage.size()); + dst.AddData(garbage.data(), garbage.size()); + } + } +}; + +std::string +make_log_line(const std::string& payload) +{ + return "1234.5678\tmy_host\t10/20\tmy_service\tmy_component\tinfo\t" + payload; +} + +struct RpcForwarderTest : public ::testing::Test { + RpcServer server; + RpcForwarder forwarder; + RpcForwarderTest() + : forwarder("localhost", server.get_listen_port(), 60.0, 3) + { + } + void forward_line(const std::string& payload) { + forwarder.forwardLine(make_log_line(payload)); + } + void flush() { + forwarder.flush(); + } + void expect_messages() { + expect_messages(0, {}); + } + void expect_messages(int exp_request_count, const std::vector<std::string>& exp_messages) { + EXPECT_EQ(exp_request_count, server.request_count); + EXPECT_EQ(exp_messages, server.messages); + } +}; + +TEST_F(RpcForwarderTest, does_not_send_rpc_with_no_log_messages) +{ + expect_messages(); + flush(); + expect_messages(); +} + +TEST_F(RpcForwarderTest, can_send_rpc_with_single_log_message) +{ + forward_line("a"); + expect_messages(); + flush(); + expect_messages(1, {"a"}); +} + +TEST_F(RpcForwarderTest, can_send_rpc_with_multiple_log_messages) +{ + forward_line("a"); + forward_line("b"); + expect_messages(); + flush(); + expect_messages(1, {"a", "b"}); +} + +TEST_F(RpcForwarderTest, automatically_sends_rpc_when_max_messages_limit_is_reached) +{ + forward_line("a"); + forward_line("b"); + expect_messages(); + forward_line("c"); + expect_messages(1, {"a", "b", "c"}); + forward_line("d"); + expect_messages(1, {"a", "b", "c"}); + forward_line("e"); + expect_messages(1, {"a", "b", "c"}); + forward_line("f"); + expect_messages(2, {"a", "b", "c", "d", "e", "f"}); +} + +TEST_F(RpcForwarderTest, throws_when_rpc_reply_contains_errors) +{ + server.reply_with_error = true; + forward_line("a"); + EXPECT_THROW(flush(), logdemon::ConnectionException); +} + +TEST_F(RpcForwarderTest, throws_when_rpc_reply_does_not_contain_proto_response) +{ + server.reply_with_proto_response = false; + forward_line("a"); + EXPECT_THROW(flush(), logdemon::DecodeException); +} + +GTEST_MAIN_RUN_ALL_TESTS() + |