aboutsummaryrefslogtreecommitdiffstats
path: root/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp')
-rw-r--r--logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp248
1 files changed, 248 insertions, 0 deletions
diff --git a/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp
new file mode 100644
index 00000000000..30ca5e19d44
--- /dev/null
+++ b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp
@@ -0,0 +1,248 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <logd/exceptions.h>
+#include <logd/metrics.h>
+#include <logd/rpc_forwarder.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/metrics/dummy_metrics_manager.h>
+
+using namespace logdemon;
+using vespalib::metrics::DummyMetricsManager;
+
+void
+encode_log_response(const ProtoConverter::ProtoLogResponse& src, FRT_Values& dst)
+{
+ auto buf = src.SerializeAsString();
+ dst.AddInt8(0);
+ dst.AddInt32(buf.size());
+ dst.AddData(buf.data(), buf.size());
+}
+
+bool
+decode_log_request(FRT_Values& src, ProtoConverter::ProtoLogRequest& dst)
+{
+ uint8_t encoding = src[0]._intval8;
+ assert(encoding == 0);
+ uint32_t uncompressed_size = src[1]._intval32;
+ assert(uncompressed_size == src[2]._data._len);
+ return dst.ParseFromArray(src[2]._data._buf, src[2]._data._len);
+}
+
+std::string garbage("garbage");
+
+struct RpcServer : public FRT_Invokable {
+ FRT_Supervisor supervisor;
+ int request_count;
+ std::vector<std::string> messages;
+ bool reply_with_error;
+ bool reply_with_proto_response;
+
+public:
+ RpcServer()
+ : supervisor(),
+ request_count(0),
+ messages(),
+ reply_with_error(false),
+ reply_with_proto_response(true)
+ {
+ supervisor.Listen(0);
+ FRT_ReflectionBuilder builder(&supervisor);
+ builder.DefineMethod("vespa.logserver.archiveLogMessages", "bix", "bix",
+ FRT_METHOD(RpcServer::rpc_archive_log_messages), this);
+ supervisor.Start();
+ }
+ ~RpcServer() {
+ supervisor.ShutDown(true);
+ }
+ int get_listen_port() {
+ return supervisor.GetListenPort();
+ }
+ void rpc_archive_log_messages(FRT_RPCRequest* request) {
+ ProtoConverter::ProtoLogRequest proto_request;
+ ASSERT_TRUE(decode_log_request(*request->GetParams(), proto_request));
+ ++request_count;
+ for (const auto& message : proto_request.log_messages()) {
+ messages.push_back(message.payload());
+ }
+ if (reply_with_error) {
+ request->SetError(123, "This is a server error");
+ return;
+ }
+ if (reply_with_proto_response) {
+ ProtoConverter::ProtoLogResponse proto_response;
+ encode_log_response(proto_response, *request->GetReturn());
+ } else {
+ auto& dst = *request->GetReturn();
+ dst.AddInt8(0);
+ dst.AddInt32(garbage.size());
+ dst.AddData(garbage.data(), garbage.size());
+ }
+ }
+};
+
+std::string
+make_log_line(const std::string& level, const std::string& payload)
+{
+ return "1234.5678\tmy_host\t10/20\tmy_service\tmy_component\t" + level + "\t" + payload;
+}
+
+struct MockMetricsManager : public DummyMetricsManager {
+ int add_count;
+ MockMetricsManager() : DummyMetricsManager(), add_count(0) {}
+ void add(Counter::Increment) override {
+ ++add_count;
+ }
+};
+
+class ClientSupervisor {
+private:
+ FRT_Supervisor _supervisor;
+public:
+ ClientSupervisor()
+ : _supervisor()
+ {
+ _supervisor.Start();
+ }
+ ~ClientSupervisor() {
+ _supervisor.ShutDown(true);
+ }
+ FRT_Supervisor& get() { return _supervisor; }
+
+};
+
+struct RpcForwarderTest : public ::testing::Test {
+ RpcServer server;
+ std::shared_ptr<MockMetricsManager> metrics_mgr;
+ Metrics metrics;
+ ClientSupervisor supervisor;
+ RpcForwarder forwarder;
+ RpcForwarderTest()
+ : server(),
+ metrics_mgr(std::make_shared<MockMetricsManager>()),
+ metrics(metrics_mgr),
+ forwarder(metrics, supervisor.get(), "localhost", server.get_listen_port(), 60.0, 3)
+ {
+ ForwardMap forward_filter;
+ forward_filter[ns_log::Logger::error] = true;
+ forward_filter[ns_log::Logger::warning] = false;
+ forward_filter[ns_log::Logger::info] = true;
+ // all other log levels are implicit false
+ forwarder.set_forward_filter(forward_filter);
+ }
+ void forward_line(const std::string& payload) {
+ forwarder.forwardLine(make_log_line("info", payload));
+ }
+ void forward_line(const std::string& level, const std::string& payload) {
+ forwarder.forwardLine(make_log_line(level, payload));
+ }
+ void forward_bad_line() {
+ forwarder.forwardLine("badline");
+ }
+ void flush() {
+ forwarder.flush();
+ }
+ void expect_messages() {
+ expect_messages(0, {});
+ }
+ void expect_messages(int exp_request_count, const std::vector<std::string>& exp_messages) {
+ EXPECT_EQ(exp_request_count, server.request_count);
+ EXPECT_EQ(exp_messages, server.messages);
+ }
+};
+
+TEST_F(RpcForwarderTest, does_not_send_rpc_with_no_log_messages)
+{
+ expect_messages();
+ flush();
+ expect_messages();
+}
+
+TEST_F(RpcForwarderTest, can_send_rpc_with_single_log_message)
+{
+ forward_line("a");
+ expect_messages();
+ flush();
+ expect_messages(1, {"a"});
+}
+
+TEST_F(RpcForwarderTest, can_send_rpc_with_multiple_log_messages)
+{
+ forward_line("a");
+ forward_line("b");
+ expect_messages();
+ flush();
+ expect_messages(1, {"a", "b"});
+}
+
+TEST_F(RpcForwarderTest, automatically_sends_rpc_when_max_messages_limit_is_reached)
+{
+ forward_line("a");
+ forward_line("b");
+ expect_messages();
+ forward_line("c");
+ expect_messages(1, {"a", "b", "c"});
+ forward_line("d");
+ expect_messages(1, {"a", "b", "c"});
+ forward_line("e");
+ expect_messages(1, {"a", "b", "c"});
+ forward_line("f");
+ expect_messages(2, {"a", "b", "c", "d", "e", "f"});
+}
+
+TEST_F(RpcForwarderTest, bad_log_lines_are_counted_but_not_sent)
+{
+ forward_line("a");
+ forward_bad_line();
+ EXPECT_EQ(1, forwarder.badLines());
+ flush();
+ expect_messages(1, {"a"});
+}
+
+TEST_F(RpcForwarderTest, bad_log_lines_count_can_be_reset)
+{
+ forward_bad_line();
+ EXPECT_EQ(1, forwarder.badLines());
+ forwarder.resetBadLines();
+ EXPECT_EQ(0, forwarder.badLines());
+}
+
+TEST_F(RpcForwarderTest, metrics_are_updated_for_each_log_message)
+{
+ forward_line("a");
+ EXPECT_EQ(1, metrics_mgr->add_count);
+ forward_line("b");
+ EXPECT_EQ(2, metrics_mgr->add_count);
+}
+
+TEST_F(RpcForwarderTest, log_messages_are_filtered_on_log_level)
+{
+ forward_line("fatal", "a");
+ forward_line("error", "b");
+ forward_line("warning", "c");
+ forward_line("config", "d");
+ forward_line("info", "e");
+ forward_line("event", "f");
+ forward_line("debug", "g");
+ forward_line("spam", "h");
+ forward_line("null", "i");
+ flush();
+ expect_messages(1, {"b", "e"});
+ EXPECT_EQ(9, metrics_mgr->add_count);
+}
+
+TEST_F(RpcForwarderTest, throws_when_rpc_reply_contains_errors)
+{
+ server.reply_with_error = true;
+ forward_line("a");
+ EXPECT_THROW(flush(), logdemon::ConnectionException);
+}
+
+TEST_F(RpcForwarderTest, throws_when_rpc_reply_does_not_contain_proto_response)
+{
+ server.reply_with_proto_response = false;
+ forward_line("a");
+ EXPECT_THROW(flush(), logdemon::DecodeException);
+}
+
+GTEST_MAIN_RUN_ALL_TESTS()
+