diff options
Diffstat (limited to 'logd/src/tests/rpc_forwarder')
-rw-r--r-- | logd/src/tests/rpc_forwarder/CMakeLists.txt | 9 | ||||
-rw-r--r-- | logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp | 162 |
2 files changed, 171 insertions, 0 deletions
diff --git a/logd/src/tests/rpc_forwarder/CMakeLists.txt b/logd/src/tests/rpc_forwarder/CMakeLists.txt new file mode 100644 index 00000000000..66a30777b41 --- /dev/null +++ b/logd/src/tests/rpc_forwarder/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(logd_rpc_forwarder_test_app TEST + SOURCES + rpc_forwarder_test.cpp + DEPENDS + logd + gtest +) +vespa_add_test(NAME logd_rpc_forwarder_test_app COMMAND logd_rpc_forwarder_test_app) diff --git a/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp new file mode 100644 index 00000000000..183132bb39d --- /dev/null +++ b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp @@ -0,0 +1,162 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <logd/exceptions.h> +#include <logd/rpc_forwarder.h> +#include <vespa/vespalib/gtest/gtest.h> + +using namespace logdemon; + +void +encode_log_response(const ProtoConverter::ProtoLogResponse& src, FRT_Values& dst) +{ + auto buf = src.SerializeAsString(); + dst.AddInt8(0); + dst.AddInt32(buf.size()); + dst.AddData(buf.data(), buf.size()); +} + +bool +decode_log_request(FRT_Values& src, ProtoConverter::ProtoLogRequest& dst) +{ + uint8_t encoding = src[0]._intval8; + assert(encoding == 0); + uint32_t uncompressed_size = src[1]._intval32; + assert(uncompressed_size == src[2]._data._len); + return dst.ParseFromArray(src[2]._data._buf, src[2]._data._len); +} + +std::string garbage("garbage"); + +struct RpcServer : public FRT_Invokable { + FRT_Supervisor supervisor; + int request_count; + std::vector<std::string> messages; + bool reply_with_error; + bool reply_with_proto_response; + +public: + RpcServer() + : supervisor(), + request_count(0), + messages(), + reply_with_error(false), + reply_with_proto_response(true) + { + supervisor.Listen(0); + supervisor.Start(); + FRT_ReflectionBuilder builder(&supervisor); + builder.DefineMethod("vespa.logserver.archiveLogMessages", "bix", "bix", + FRT_METHOD(RpcServer::rpc_archive_log_messages), this); + } + ~RpcServer() { + supervisor.ShutDown(true); + } + int get_listen_port() { + return supervisor.GetListenPort(); + } + void rpc_archive_log_messages(FRT_RPCRequest* request) { + ProtoConverter::ProtoLogRequest proto_request; + ASSERT_TRUE(decode_log_request(*request->GetParams(), proto_request)); + ++request_count; + for (const auto& message : proto_request.log_messages()) { + messages.push_back(message.payload()); + } + if (reply_with_error) { + request->SetError(123, "This is a server error"); + return; + } + if (reply_with_proto_response) { + ProtoConverter::ProtoLogResponse proto_response; + encode_log_response(proto_response, *request->GetReturn()); + } else { + auto& dst = *request->GetReturn(); + dst.AddInt8(0); + dst.AddInt32(garbage.size()); + dst.AddData(garbage.data(), garbage.size()); + } + } +}; + +std::string +make_log_line(const std::string& payload) +{ + return "1234.5678\tmy_host\t10/20\tmy_service\tmy_component\tinfo\t" + payload; +} + +struct RpcForwarderTest : public ::testing::Test { + RpcServer server; + RpcForwarder forwarder; + RpcForwarderTest() + : forwarder("localhost", server.get_listen_port(), 60.0, 3) + { + } + void forward_line(const std::string& payload) { + forwarder.forwardLine(make_log_line(payload)); + } + void flush() { + forwarder.flush(); + } + void expect_messages() { + expect_messages(0, {}); + } + void expect_messages(int exp_request_count, const std::vector<std::string>& exp_messages) { + EXPECT_EQ(exp_request_count, server.request_count); + EXPECT_EQ(exp_messages, server.messages); + } +}; + +TEST_F(RpcForwarderTest, does_not_send_rpc_with_no_log_messages) +{ + expect_messages(); + flush(); + expect_messages(); +} + +TEST_F(RpcForwarderTest, can_send_rpc_with_single_log_message) +{ + forward_line("a"); + expect_messages(); + flush(); + expect_messages(1, {"a"}); +} + +TEST_F(RpcForwarderTest, can_send_rpc_with_multiple_log_messages) +{ + forward_line("a"); + forward_line("b"); + expect_messages(); + flush(); + expect_messages(1, {"a", "b"}); +} + +TEST_F(RpcForwarderTest, automatically_sends_rpc_when_max_messages_limit_is_reached) +{ + forward_line("a"); + forward_line("b"); + expect_messages(); + forward_line("c"); + expect_messages(1, {"a", "b", "c"}); + forward_line("d"); + expect_messages(1, {"a", "b", "c"}); + forward_line("e"); + expect_messages(1, {"a", "b", "c"}); + forward_line("f"); + expect_messages(2, {"a", "b", "c", "d", "e", "f"}); +} + +TEST_F(RpcForwarderTest, throws_when_rpc_reply_contains_errors) +{ + server.reply_with_error = true; + forward_line("a"); + EXPECT_THROW(flush(), logdemon::ConnectionException); +} + +TEST_F(RpcForwarderTest, throws_when_rpc_reply_does_not_contain_proto_response) +{ + server.reply_with_proto_response = false; + forward_line("a"); + EXPECT_THROW(flush(), logdemon::DecodeException); +} + +GTEST_MAIN_RUN_ALL_TESTS() + |