summaryrefslogtreecommitdiffstats
path: root/logd
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2019-03-28 09:43:40 +0000
committerGeir Storli <geirst@verizonmedia.com>2019-03-28 10:03:20 +0000
commit5d22214d38aad75b0362cd64283394a0c02fb12f (patch)
treece52a6c86aae4333cb848fff7ff2eb653ef0f9c0 /logd
parent949d1664e62bf29cb67193274a88991b0157b070 (diff)
Implement initial version of forwarder that uses RPC to send protobuf encoded log messages to the logserver.
Diffstat (limited to 'logd')
-rw-r--r--logd/CMakeLists.txt1
-rw-r--r--logd/src/logd/CMakeLists.txt1
-rw-r--r--logd/src/logd/exceptions.h7
-rw-r--r--logd/src/logd/forwarder.h5
-rw-r--r--logd/src/logd/legacy_forwarder.cpp18
-rw-r--r--logd/src/logd/legacy_forwarder.h5
-rw-r--r--logd/src/logd/proto_converter.h3
-rw-r--r--logd/src/logd/rpc_forwarder.cpp118
-rw-r--r--logd/src/logd/rpc_forwarder.h39
-rw-r--r--logd/src/logd/watcher.cpp2
-rw-r--r--logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp5
-rw-r--r--logd/src/tests/rpc_forwarder/CMakeLists.txt9
-rw-r--r--logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp162
13 files changed, 357 insertions, 18 deletions
diff --git a/logd/CMakeLists.txt b/logd/CMakeLists.txt
index 9a5fdf32841..e16d3bd5179 100644
--- a/logd/CMakeLists.txt
+++ b/logd/CMakeLists.txt
@@ -17,6 +17,7 @@ vespa_define_module(
src/tests/legacy_forwarder
src/tests/proto_converter
src/tests/rotate
+ src/tests/rpc_forwarder
)
vespa_install_script(src/apps/retention/retention-enforcer.sh vespa-retention-enforcer sbin)
diff --git a/logd/src/logd/CMakeLists.txt b/logd/src/logd/CMakeLists.txt
index baf52f1d5d8..629c7b1637a 100644
--- a/logd/src/logd/CMakeLists.txt
+++ b/logd/src/logd/CMakeLists.txt
@@ -16,6 +16,7 @@ vespa_add_library(logd STATIC
legacy_forwarder.cpp
metrics.cpp
proto_converter.cpp
+ rpc_forwarder.cpp
state_reporter.cpp
watcher.cpp
${logd_PROTOBUF_SRCS}
diff --git a/logd/src/logd/exceptions.h b/logd/src/logd/exceptions.h
index b2ff3516b69..82e8b570c3b 100644
--- a/logd/src/logd/exceptions.h
+++ b/logd/src/logd/exceptions.h
@@ -18,7 +18,12 @@ public:
class ConnectionException : public MsgException
{
public:
- ConnectionException(const char *s) : MsgException(s) {}
+ ConnectionException(const std::string& msg) : MsgException(msg) {}
+};
+
+class DecodeException : public MsgException {
+public:
+ DecodeException(const std::string& msg) : MsgException(msg) {}
};
class SigTermException : public MsgException
diff --git a/logd/src/logd/forwarder.h b/logd/src/logd/forwarder.h
index a0a1c5f1ea5..b26abf01658 100644
--- a/logd/src/logd/forwarder.h
+++ b/logd/src/logd/forwarder.h
@@ -2,6 +2,8 @@
#pragma once
+#include <string_view>
+
namespace logdemon {
/**
@@ -11,7 +13,8 @@ class Forwarder {
public:
virtual ~Forwarder() {}
virtual void sendMode() = 0;
- virtual void forwardLine(const char *line, const char *eol) = 0;
+ virtual void forwardLine(std::string_view log_line) = 0;
+ virtual void flush() = 0;
virtual int badLines() const = 0;
virtual void resetBadLines() = 0;
};
diff --git a/logd/src/logd/legacy_forwarder.cpp b/logd/src/logd/legacy_forwarder.cpp
index b8b93a03530..6b2f430d388 100644
--- a/logd/src/logd/legacy_forwarder.cpp
+++ b/logd/src/logd/legacy_forwarder.cpp
@@ -58,26 +58,24 @@ LegacyForwarder::sendMode()
}
void
-LegacyForwarder::forwardLine(const char *line, const char *eol)
+LegacyForwarder::forwardLine(std::string_view line)
{
- int linelen = eol - line;
-
assert(_logserverfd >= 0);
- assert (linelen > 0);
- assert (linelen < 1024*1024);
- assert (line[linelen - 1] == '\n');
+ assert (line.size() > 0);
+ assert (line.size() < 1024*1024);
+ assert (line[line.size() - 1] == '\n');
- if (parseline(line, eol)) {
- forwardText(line, linelen);
+ if (parseLine(line)) {
+ forwardText(line.data(), line.size());
}
}
bool
-LegacyForwarder::parseline(const char *linestart, const char *lineend)
+LegacyForwarder::parseLine(std::string_view line)
{
LogMessage message;
try {
- message.parse_log_line(std::string_view(linestart, lineend - linestart));
+ message.parse_log_line(line);
} catch (BadLogLineException &e) {
LOG(spam, "bad logline: %s", e.what());
++_badLines;
diff --git a/logd/src/logd/legacy_forwarder.h b/logd/src/logd/legacy_forwarder.h
index 81a93ce1d50..7adeabef29e 100644
--- a/logd/src/logd/legacy_forwarder.h
+++ b/logd/src/logd/legacy_forwarder.h
@@ -29,12 +29,13 @@ private:
ret[len] = '\0';
return ret;
}
- bool parseline(const char *linestart, const char *lineend);
+ bool parseLine(std::string_view line);
public:
LegacyForwarder(Metrics &metrics);
~LegacyForwarder();
void forwardText(const char *text, int len);
- void forwardLine(const char *line, const char *eol) override;
+ void forwardLine(std::string_view line) override;
+ void flush() override {}
void setForwardMap(const ForwardMap & forwardMap) { _forwardMap = forwardMap; }
void setLogserverFD(int fd) { _logserverfd = fd; }
int getLogserverFD() { return _logserverfd; }
diff --git a/logd/src/logd/proto_converter.h b/logd/src/logd/proto_converter.h
index 688648b99de..88749100736 100644
--- a/logd/src/logd/proto_converter.h
+++ b/logd/src/logd/proto_converter.h
@@ -1,5 +1,7 @@
// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
#include "log_protocol_proto.h"
#include <vespa/log/log_message.h>
#include <vector>
@@ -11,6 +13,7 @@ namespace logdemon {
*/
struct ProtoConverter {
using ProtoLogRequest = logserver::protocol::protobuf::LogRequest;
+ using ProtoLogResponse = logserver::protocol::protobuf::LogResponse;
using ProtoLogMessage = logserver::protocol::protobuf::LogMessage;
static void log_messages_to_proto(const std::vector<ns_log::LogMessage>& messages, ProtoLogRequest& proto);
diff --git a/logd/src/logd/rpc_forwarder.cpp b/logd/src/logd/rpc_forwarder.cpp
new file mode 100644
index 00000000000..37436c713bd
--- /dev/null
+++ b/logd/src/logd/rpc_forwarder.cpp
@@ -0,0 +1,118 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "exceptions.h"
+#include "proto_converter.h"
+#include "rpc_forwarder.h"
+#include <vespa/log/exceptions.h>
+#include <vespa/vespalib/util/buffer.h>
+#include <vespa/vespalib/util/stringfmt.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".logd.rpc_forwarder");
+
+using ns_log::BadLogLineException;
+using ns_log::LogMessage;
+using vespalib::make_string;
+
+namespace logdemon {
+
+RpcForwarder::RpcForwarder(const vespalib::string &hostname, int rpc_port,
+ double rpc_timeout_secs, size_t max_messages_per_request)
+ : _connection_spec(make_string("tcp/%s:%d", hostname.c_str(), rpc_port)),
+ _rpc_timeout_secs(rpc_timeout_secs),
+ _max_messages_per_request(max_messages_per_request),
+ _supervisor(),
+ _target(),
+ _messages()
+{
+ _supervisor.Start();
+ _target = _supervisor.GetTarget(_connection_spec.c_str());
+}
+
+RpcForwarder::~RpcForwarder()
+{
+ _target->SubRef();
+ _supervisor.ShutDown(true);
+}
+
+namespace {
+
+void
+encode_log_request(const ProtoConverter::ProtoLogRequest& src, FRT_RPCRequest& dst)
+{
+ dst.SetMethodName("vespa.logserver.archiveLogMessages");
+ auto buf = src.SerializeAsString();
+ auto& params = *dst.GetParams();
+ params.AddInt8(0); // '0' indicates no compression
+ params.AddInt32(buf.size());
+ params.AddData(buf.data(), buf.size());
+}
+
+bool
+decode_log_response(FRT_RPCRequest& src, ProtoConverter::ProtoLogResponse& dst)
+{
+ auto& values = *src.GetReturn();
+ uint8_t encoding = values[0]._intval8;
+ assert(encoding == 0); // Not using compression
+ uint32_t uncompressed_size = values[1]._intval32;
+ (void) uncompressed_size;
+ return dst.ParseFromArray(values[2]._data._buf, values[2]._data._len);
+}
+
+}
+
+void
+RpcForwarder::forwardLine(std::string_view line)
+{
+ LogMessage message;
+ try {
+ message.parse_log_line(line);
+ } catch (BadLogLineException &e) {
+ LOG(spam, "Skipping bad logline: %s", e.what());
+ return;
+ }
+ _messages.push_back(std::move(message));
+ if (_messages.size() == _max_messages_per_request) {
+ flush();
+ }
+}
+
+void
+RpcForwarder::flush()
+{
+ if (_messages.empty()) {
+ return;
+ }
+ ProtoConverter::ProtoLogRequest proto_request;
+ ProtoConverter::log_messages_to_proto(_messages, proto_request);
+ auto request = new FRT_RPCRequest();
+ encode_log_request(proto_request, *request);
+ _target->InvokeSync(request, _rpc_timeout_secs);
+ if (!request->CheckReturnTypes("bix")) {
+ auto error_msg = make_string("Error in rpc reply from '%s': '%s'",
+ _connection_spec.c_str(), request->GetErrorMessage());
+ request->SubRef();
+ throw ConnectionException(error_msg);
+ }
+ ProtoConverter::ProtoLogResponse proto_response;
+ if (!decode_log_response(*request, proto_response)) {
+ auto error_msg = make_string("Error during decoding of protobuf response from '%s'", _connection_spec.c_str());
+ request->SubRef();
+ throw DecodeException(error_msg);
+ }
+ request->SubRef();
+ _messages.clear();
+}
+
+int
+RpcForwarder::badLines() const
+{
+ return 0;
+}
+
+void
+RpcForwarder::resetBadLines()
+{
+}
+
+}
diff --git a/logd/src/logd/rpc_forwarder.h b/logd/src/logd/rpc_forwarder.h
new file mode 100644
index 00000000000..65ac0b0af44
--- /dev/null
+++ b/logd/src/logd/rpc_forwarder.h
@@ -0,0 +1,39 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "forwarder.h"
+#include "proto_converter.h"
+#include <vespa/log/log_message.h>
+#include <vespa/fnet/frt/frt.h>
+#include <vector>
+
+namespace logdemon {
+
+/**
+ * Implementation of the Forwarder interface that uses RPC to send protobuf encoded log messages to the logserver.
+ */
+class RpcForwarder : public Forwarder {
+private:
+ vespalib::string _connection_spec;
+ double _rpc_timeout_secs;
+ size_t _max_messages_per_request;
+ FRT_Supervisor _supervisor;
+ FRT_Target* _target;
+ std::vector<ns_log::LogMessage> _messages;
+
+public:
+ RpcForwarder(const vespalib::string& logserver_host, int logserver_rpc_port,
+ double rpc_timeout_secs, size_t max_messages_per_request);
+ ~RpcForwarder() override;
+
+ // Implements Forwarder
+ void sendMode() override {}
+ void forwardLine(std::string_view line) override;
+ void flush() override;
+ int badLines() const override;
+ void resetBadLines() override;
+};
+
+}
+
diff --git a/logd/src/logd/watcher.cpp b/logd/src/logd/watcher.cpp
index a047c110f32..c505d2dd235 100644
--- a/logd/src/logd/watcher.cpp
+++ b/logd/src/logd/watcher.cpp
@@ -220,7 +220,7 @@ Watcher::watchfile()
}
while (nnl != nullptr && elapsed(tickStart) < 1) {
++nnl;
- _forwarder.forwardLine(l, nnl);
+ _forwarder.forwardLine(std::string_view(l, (nnl - l)));
ssize_t wsize = nnl - l;
offset += wsize;
l = nnl;
diff --git a/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp b/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp
index 3af35f9aa09..c6702e8bc67 100644
--- a/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp
+++ b/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp
@@ -40,13 +40,12 @@ struct ForwardFixture {
}
void verifyForward(bool doForward) {
- const std::string & line(logLine);
- forwarder.forwardLine(line.c_str(), line.c_str() + line.length());
+ forwarder.forwardLine(logLine);
fsync(fd);
int rfd = open(fname.c_str(), O_RDONLY);
char *buffer[2048];
ssize_t bytes = read(rfd, buffer, 2048);
- ssize_t expected = doForward ? line.length() : 0;
+ ssize_t expected = doForward ? logLine.length() : 0;
EXPECT_EQUAL(expected, bytes);
close(rfd);
}
diff --git a/logd/src/tests/rpc_forwarder/CMakeLists.txt b/logd/src/tests/rpc_forwarder/CMakeLists.txt
new file mode 100644
index 00000000000..66a30777b41
--- /dev/null
+++ b/logd/src/tests/rpc_forwarder/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(logd_rpc_forwarder_test_app TEST
+ SOURCES
+ rpc_forwarder_test.cpp
+ DEPENDS
+ logd
+ gtest
+)
+vespa_add_test(NAME logd_rpc_forwarder_test_app COMMAND logd_rpc_forwarder_test_app)
diff --git a/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp
new file mode 100644
index 00000000000..183132bb39d
--- /dev/null
+++ b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp
@@ -0,0 +1,162 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <logd/exceptions.h>
+#include <logd/rpc_forwarder.h>
+#include <vespa/vespalib/gtest/gtest.h>
+
+using namespace logdemon;
+
+void
+encode_log_response(const ProtoConverter::ProtoLogResponse& src, FRT_Values& dst)
+{
+ auto buf = src.SerializeAsString();
+ dst.AddInt8(0);
+ dst.AddInt32(buf.size());
+ dst.AddData(buf.data(), buf.size());
+}
+
+bool
+decode_log_request(FRT_Values& src, ProtoConverter::ProtoLogRequest& dst)
+{
+ uint8_t encoding = src[0]._intval8;
+ assert(encoding == 0);
+ uint32_t uncompressed_size = src[1]._intval32;
+ assert(uncompressed_size == src[2]._data._len);
+ return dst.ParseFromArray(src[2]._data._buf, src[2]._data._len);
+}
+
+std::string garbage("garbage");
+
+struct RpcServer : public FRT_Invokable {
+ FRT_Supervisor supervisor;
+ int request_count;
+ std::vector<std::string> messages;
+ bool reply_with_error;
+ bool reply_with_proto_response;
+
+public:
+ RpcServer()
+ : supervisor(),
+ request_count(0),
+ messages(),
+ reply_with_error(false),
+ reply_with_proto_response(true)
+ {
+ supervisor.Listen(0);
+ supervisor.Start();
+ FRT_ReflectionBuilder builder(&supervisor);
+ builder.DefineMethod("vespa.logserver.archiveLogMessages", "bix", "bix",
+ FRT_METHOD(RpcServer::rpc_archive_log_messages), this);
+ }
+ ~RpcServer() {
+ supervisor.ShutDown(true);
+ }
+ int get_listen_port() {
+ return supervisor.GetListenPort();
+ }
+ void rpc_archive_log_messages(FRT_RPCRequest* request) {
+ ProtoConverter::ProtoLogRequest proto_request;
+ ASSERT_TRUE(decode_log_request(*request->GetParams(), proto_request));
+ ++request_count;
+ for (const auto& message : proto_request.log_messages()) {
+ messages.push_back(message.payload());
+ }
+ if (reply_with_error) {
+ request->SetError(123, "This is a server error");
+ return;
+ }
+ if (reply_with_proto_response) {
+ ProtoConverter::ProtoLogResponse proto_response;
+ encode_log_response(proto_response, *request->GetReturn());
+ } else {
+ auto& dst = *request->GetReturn();
+ dst.AddInt8(0);
+ dst.AddInt32(garbage.size());
+ dst.AddData(garbage.data(), garbage.size());
+ }
+ }
+};
+
+std::string
+make_log_line(const std::string& payload)
+{
+ return "1234.5678\tmy_host\t10/20\tmy_service\tmy_component\tinfo\t" + payload;
+}
+
+struct RpcForwarderTest : public ::testing::Test {
+ RpcServer server;
+ RpcForwarder forwarder;
+ RpcForwarderTest()
+ : forwarder("localhost", server.get_listen_port(), 60.0, 3)
+ {
+ }
+ void forward_line(const std::string& payload) {
+ forwarder.forwardLine(make_log_line(payload));
+ }
+ void flush() {
+ forwarder.flush();
+ }
+ void expect_messages() {
+ expect_messages(0, {});
+ }
+ void expect_messages(int exp_request_count, const std::vector<std::string>& exp_messages) {
+ EXPECT_EQ(exp_request_count, server.request_count);
+ EXPECT_EQ(exp_messages, server.messages);
+ }
+};
+
+TEST_F(RpcForwarderTest, does_not_send_rpc_with_no_log_messages)
+{
+ expect_messages();
+ flush();
+ expect_messages();
+}
+
+TEST_F(RpcForwarderTest, can_send_rpc_with_single_log_message)
+{
+ forward_line("a");
+ expect_messages();
+ flush();
+ expect_messages(1, {"a"});
+}
+
+TEST_F(RpcForwarderTest, can_send_rpc_with_multiple_log_messages)
+{
+ forward_line("a");
+ forward_line("b");
+ expect_messages();
+ flush();
+ expect_messages(1, {"a", "b"});
+}
+
+TEST_F(RpcForwarderTest, automatically_sends_rpc_when_max_messages_limit_is_reached)
+{
+ forward_line("a");
+ forward_line("b");
+ expect_messages();
+ forward_line("c");
+ expect_messages(1, {"a", "b", "c"});
+ forward_line("d");
+ expect_messages(1, {"a", "b", "c"});
+ forward_line("e");
+ expect_messages(1, {"a", "b", "c"});
+ forward_line("f");
+ expect_messages(2, {"a", "b", "c", "d", "e", "f"});
+}
+
+TEST_F(RpcForwarderTest, throws_when_rpc_reply_contains_errors)
+{
+ server.reply_with_error = true;
+ forward_line("a");
+ EXPECT_THROW(flush(), logdemon::ConnectionException);
+}
+
+TEST_F(RpcForwarderTest, throws_when_rpc_reply_does_not_contain_proto_response)
+{
+ server.reply_with_proto_response = false;
+ forward_line("a");
+ EXPECT_THROW(flush(), logdemon::DecodeException);
+}
+
+GTEST_MAIN_RUN_ALL_TESTS()
+