summaryrefslogtreecommitdiffstats
path: root/logd
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2019-03-29 10:43:30 +0100
committerGitHub <noreply@github.com>2019-03-29 10:43:30 +0100
commitc02161249c161ee65f5a0be018439152b6cc8763 (patch)
treea50e9c6cb1c235a7994907e5ac6fd58d4ac9d367 /logd
parenta748f744ef4f14bac32c9c62e71512162325e3b9 (diff)
parent24e3f9a0da95f11d0603a727e1ea0c860422de2e (diff)
Merge pull request #8937 from vespa-engine/geirst/logd-implement-rpc-forwarder
Geirst/logd implement rpc forwarder
Diffstat (limited to 'logd')
-rw-r--r--logd/CMakeLists.txt1
-rw-r--r--logd/src/logd/CMakeLists.txt1
-rw-r--r--logd/src/logd/exceptions.h7
-rw-r--r--logd/src/logd/forwarder.h10
-rw-r--r--logd/src/logd/legacy_forwarder.cpp18
-rw-r--r--logd/src/logd/legacy_forwarder.h11
-rw-r--r--logd/src/logd/proto_converter.h3
-rw-r--r--logd/src/logd/rpc_forwarder.cpp151
-rw-r--r--logd/src/logd/rpc_forwarder.h45
-rw-r--r--logd/src/logd/watcher.cpp2
-rw-r--r--logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp5
-rw-r--r--logd/src/tests/rpc_forwarder/CMakeLists.txt9
-rw-r--r--logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp248
13 files changed, 487 insertions, 24 deletions
diff --git a/logd/CMakeLists.txt b/logd/CMakeLists.txt
index 9a5fdf32841..e16d3bd5179 100644
--- a/logd/CMakeLists.txt
+++ b/logd/CMakeLists.txt
@@ -17,6 +17,7 @@ vespa_define_module(
src/tests/legacy_forwarder
src/tests/proto_converter
src/tests/rotate
+ src/tests/rpc_forwarder
)
vespa_install_script(src/apps/retention/retention-enforcer.sh vespa-retention-enforcer sbin)
diff --git a/logd/src/logd/CMakeLists.txt b/logd/src/logd/CMakeLists.txt
index baf52f1d5d8..629c7b1637a 100644
--- a/logd/src/logd/CMakeLists.txt
+++ b/logd/src/logd/CMakeLists.txt
@@ -16,6 +16,7 @@ vespa_add_library(logd STATIC
legacy_forwarder.cpp
metrics.cpp
proto_converter.cpp
+ rpc_forwarder.cpp
state_reporter.cpp
watcher.cpp
${logd_PROTOBUF_SRCS}
diff --git a/logd/src/logd/exceptions.h b/logd/src/logd/exceptions.h
index b2ff3516b69..82e8b570c3b 100644
--- a/logd/src/logd/exceptions.h
+++ b/logd/src/logd/exceptions.h
@@ -18,7 +18,12 @@ public:
class ConnectionException : public MsgException
{
public:
- ConnectionException(const char *s) : MsgException(s) {}
+ ConnectionException(const std::string& msg) : MsgException(msg) {}
+};
+
+class DecodeException : public MsgException {
+public:
+ DecodeException(const std::string& msg) : MsgException(msg) {}
};
class SigTermException : public MsgException
diff --git a/logd/src/logd/forwarder.h b/logd/src/logd/forwarder.h
index a0a1c5f1ea5..93cfdb3de9f 100644
--- a/logd/src/logd/forwarder.h
+++ b/logd/src/logd/forwarder.h
@@ -2,8 +2,15 @@
#pragma once
+#include <vespa/log/log.h>
+#include <map>
+#include <string_view>
+
namespace logdemon {
+// Mapping saying if a level should be forwarded or not
+using ForwardMap = std::map<ns_log::Logger::LogLevel, bool>;
+
/**
* Interface used to forward log lines to something.
*/
@@ -11,7 +18,8 @@ class Forwarder {
public:
virtual ~Forwarder() {}
virtual void sendMode() = 0;
- virtual void forwardLine(const char *line, const char *eol) = 0;
+ virtual void forwardLine(std::string_view log_line) = 0;
+ virtual void flush() = 0;
virtual int badLines() const = 0;
virtual void resetBadLines() = 0;
};
diff --git a/logd/src/logd/legacy_forwarder.cpp b/logd/src/logd/legacy_forwarder.cpp
index b8b93a03530..6b2f430d388 100644
--- a/logd/src/logd/legacy_forwarder.cpp
+++ b/logd/src/logd/legacy_forwarder.cpp
@@ -58,26 +58,24 @@ LegacyForwarder::sendMode()
}
void
-LegacyForwarder::forwardLine(const char *line, const char *eol)
+LegacyForwarder::forwardLine(std::string_view line)
{
- int linelen = eol - line;
-
assert(_logserverfd >= 0);
- assert (linelen > 0);
- assert (linelen < 1024*1024);
- assert (line[linelen - 1] == '\n');
+ assert (line.size() > 0);
+ assert (line.size() < 1024*1024);
+ assert (line[line.size() - 1] == '\n');
- if (parseline(line, eol)) {
- forwardText(line, linelen);
+ if (parseLine(line)) {
+ forwardText(line.data(), line.size());
}
}
bool
-LegacyForwarder::parseline(const char *linestart, const char *lineend)
+LegacyForwarder::parseLine(std::string_view line)
{
LogMessage message;
try {
- message.parse_log_line(std::string_view(linestart, lineend - linestart));
+ message.parse_log_line(line);
} catch (BadLogLineException &e) {
LOG(spam, "bad logline: %s", e.what());
++_badLines;
diff --git a/logd/src/logd/legacy_forwarder.h b/logd/src/logd/legacy_forwarder.h
index 81a93ce1d50..db3bf84fd4f 100644
--- a/logd/src/logd/legacy_forwarder.h
+++ b/logd/src/logd/legacy_forwarder.h
@@ -2,15 +2,9 @@
#pragma once
#include "forwarder.h"
-#include <vespa/log/log.h>
-#include <map>
-#include <unordered_set>
namespace logdemon {
-// Mapping saying if a level should be forwarded or not
-using ForwardMap = std::map<ns_log::Logger::LogLevel, bool>;
-
struct Metrics;
/**
@@ -29,12 +23,13 @@ private:
ret[len] = '\0';
return ret;
}
- bool parseline(const char *linestart, const char *lineend);
+ bool parseLine(std::string_view line);
public:
LegacyForwarder(Metrics &metrics);
~LegacyForwarder();
void forwardText(const char *text, int len);
- void forwardLine(const char *line, const char *eol) override;
+ void forwardLine(std::string_view line) override;
+ void flush() override {}
void setForwardMap(const ForwardMap & forwardMap) { _forwardMap = forwardMap; }
void setLogserverFD(int fd) { _logserverfd = fd; }
int getLogserverFD() { return _logserverfd; }
diff --git a/logd/src/logd/proto_converter.h b/logd/src/logd/proto_converter.h
index 688648b99de..88749100736 100644
--- a/logd/src/logd/proto_converter.h
+++ b/logd/src/logd/proto_converter.h
@@ -1,5 +1,7 @@
// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
#include "log_protocol_proto.h"
#include <vespa/log/log_message.h>
#include <vector>
@@ -11,6 +13,7 @@ namespace logdemon {
*/
struct ProtoConverter {
using ProtoLogRequest = logserver::protocol::protobuf::LogRequest;
+ using ProtoLogResponse = logserver::protocol::protobuf::LogResponse;
using ProtoLogMessage = logserver::protocol::protobuf::LogMessage;
static void log_messages_to_proto(const std::vector<ns_log::LogMessage>& messages, ProtoLogRequest& proto);
diff --git a/logd/src/logd/rpc_forwarder.cpp b/logd/src/logd/rpc_forwarder.cpp
new file mode 100644
index 00000000000..e515f463db4
--- /dev/null
+++ b/logd/src/logd/rpc_forwarder.cpp
@@ -0,0 +1,151 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "exceptions.h"
+#include "metrics.h"
+#include "proto_converter.h"
+#include "rpc_forwarder.h"
+#include <vespa/log/exceptions.h>
+#include <vespa/vespalib/util/buffer.h>
+#include <vespa/vespalib/util/stringfmt.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".logd.rpc_forwarder");
+
+using ns_log::BadLogLineException;
+using ns_log::LogMessage;
+using vespalib::make_string;
+
+namespace logdemon {
+
+RpcForwarder::RpcForwarder(Metrics& metrics, FRT_Supervisor& supervisor,
+ const vespalib::string &hostname, int rpc_port,
+ double rpc_timeout_secs, size_t max_messages_per_request)
+ : _metrics(metrics),
+ _connection_spec(make_string("tcp/%s:%d", hostname.c_str(), rpc_port)),
+ _rpc_timeout_secs(rpc_timeout_secs),
+ _max_messages_per_request(max_messages_per_request),
+ _target(),
+ _messages(),
+ _bad_lines(0),
+ _forward_filter()
+{
+ _target = supervisor.GetTarget(_connection_spec.c_str());
+}
+
+RpcForwarder::~RpcForwarder()
+{
+ _target->SubRef();
+}
+
+namespace {
+
+void
+encode_log_request(const ProtoConverter::ProtoLogRequest& src, FRT_RPCRequest& dst)
+{
+ dst.SetMethodName("vespa.logserver.archiveLogMessages");
+ auto buf = src.SerializeAsString();
+ auto& params = *dst.GetParams();
+ params.AddInt8(0); // '0' indicates no compression
+ params.AddInt32(buf.size());
+ params.AddData(buf.data(), buf.size());
+}
+
+bool
+decode_log_response(FRT_RPCRequest& src, ProtoConverter::ProtoLogResponse& dst)
+{
+ auto& values = *src.GetReturn();
+ uint8_t encoding = values[0]._intval8;
+ assert(encoding == 0); // Not using compression
+ uint32_t uncompressed_size = values[1]._intval32;
+ (void) uncompressed_size;
+ return dst.ParseFromArray(values[2]._data._buf, values[2]._data._len);
+}
+
+bool
+should_forward_log_message(const LogMessage& message, const ForwardMap& filter)
+{
+ auto found = filter.find(message.level());
+ if (found != filter.end()) {
+ return found->second;
+ }
+ return false;
+}
+
+}
+
+void
+RpcForwarder::forwardLine(std::string_view line)
+{
+ LogMessage message;
+ try {
+ message.parse_log_line(line);
+ } catch (BadLogLineException &e) {
+ LOG(spam, "Skipping bad logline: %s", e.what());
+ ++_bad_lines;
+ return;
+ }
+ _metrics.countLine(ns_log::Logger::logLevelNames[message.level()], message.service());
+ if (should_forward_log_message(message, _forward_filter)) {
+ _messages.push_back(std::move(message));
+ if (_messages.size() == _max_messages_per_request) {
+ flush();
+ }
+ }
+}
+
+namespace {
+
+class GuardedRequest {
+private:
+ FRT_RPCRequest* _request;
+public:
+ GuardedRequest()
+ : _request(new FRT_RPCRequest())
+ {}
+ ~GuardedRequest() {
+ _request->SubRef();
+ }
+ FRT_RPCRequest& operator*() const { return *_request; }
+ FRT_RPCRequest* get() const { return _request; }
+ FRT_RPCRequest* operator->() const { return get(); }
+};
+
+}
+
+void
+RpcForwarder::flush()
+{
+ if (_messages.empty()) {
+ return;
+ }
+ ProtoConverter::ProtoLogRequest proto_request;
+ ProtoConverter::log_messages_to_proto(_messages, proto_request);
+ GuardedRequest request;
+ encode_log_request(proto_request, *request);
+ _target->InvokeSync(request.get(), _rpc_timeout_secs);
+ if (!request->CheckReturnTypes("bix")) {
+ auto error_msg = make_string("Error in rpc reply from '%s': '%s'",
+ _connection_spec.c_str(), request->GetErrorMessage());
+ throw ConnectionException(error_msg);
+ }
+ ProtoConverter::ProtoLogResponse proto_response;
+ if (!decode_log_response(*request, proto_response)) {
+ auto error_msg = make_string("Error during decoding of protobuf response from '%s'", _connection_spec.c_str());
+ throw DecodeException(error_msg);
+ }
+ _messages.clear();
+}
+
+int
+RpcForwarder::badLines() const
+{
+ return _bad_lines;
+}
+
+void
+RpcForwarder::resetBadLines()
+{
+ _bad_lines = 0;
+}
+
+}
diff --git a/logd/src/logd/rpc_forwarder.h b/logd/src/logd/rpc_forwarder.h
new file mode 100644
index 00000000000..3212da08195
--- /dev/null
+++ b/logd/src/logd/rpc_forwarder.h
@@ -0,0 +1,45 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "forwarder.h"
+#include "proto_converter.h"
+#include <vespa/log/log_message.h>
+#include <vespa/fnet/frt/frt.h>
+#include <vector>
+
+namespace logdemon {
+
+struct Metrics;
+
+/**
+ * Implementation of the Forwarder interface that uses RPC to send protobuf encoded log messages to the logserver.
+ */
+class RpcForwarder : public Forwarder {
+private:
+ Metrics& _metrics;
+ vespalib::string _connection_spec;
+ double _rpc_timeout_secs;
+ size_t _max_messages_per_request;
+ FRT_Target* _target;
+ std::vector<ns_log::LogMessage> _messages;
+ int _bad_lines;
+ ForwardMap _forward_filter;
+
+public:
+ RpcForwarder(Metrics& metrics, FRT_Supervisor& supervisor,
+ const vespalib::string& logserver_host, int logserver_rpc_port,
+ double rpc_timeout_secs, size_t max_messages_per_request);
+ ~RpcForwarder() override;
+ void set_forward_filter(const ForwardMap& forward_filter) { _forward_filter = forward_filter; }
+
+ // Implements Forwarder
+ void sendMode() override {}
+ void forwardLine(std::string_view line) override;
+ void flush() override;
+ int badLines() const override;
+ void resetBadLines() override;
+};
+
+}
+
diff --git a/logd/src/logd/watcher.cpp b/logd/src/logd/watcher.cpp
index a047c110f32..c505d2dd235 100644
--- a/logd/src/logd/watcher.cpp
+++ b/logd/src/logd/watcher.cpp
@@ -220,7 +220,7 @@ Watcher::watchfile()
}
while (nnl != nullptr && elapsed(tickStart) < 1) {
++nnl;
- _forwarder.forwardLine(l, nnl);
+ _forwarder.forwardLine(std::string_view(l, (nnl - l)));
ssize_t wsize = nnl - l;
offset += wsize;
l = nnl;
diff --git a/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp b/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp
index 3af35f9aa09..c6702e8bc67 100644
--- a/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp
+++ b/logd/src/tests/legacy_forwarder/legacy_forwarder_test.cpp
@@ -40,13 +40,12 @@ struct ForwardFixture {
}
void verifyForward(bool doForward) {
- const std::string & line(logLine);
- forwarder.forwardLine(line.c_str(), line.c_str() + line.length());
+ forwarder.forwardLine(logLine);
fsync(fd);
int rfd = open(fname.c_str(), O_RDONLY);
char *buffer[2048];
ssize_t bytes = read(rfd, buffer, 2048);
- ssize_t expected = doForward ? line.length() : 0;
+ ssize_t expected = doForward ? logLine.length() : 0;
EXPECT_EQUAL(expected, bytes);
close(rfd);
}
diff --git a/logd/src/tests/rpc_forwarder/CMakeLists.txt b/logd/src/tests/rpc_forwarder/CMakeLists.txt
new file mode 100644
index 00000000000..66a30777b41
--- /dev/null
+++ b/logd/src/tests/rpc_forwarder/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(logd_rpc_forwarder_test_app TEST
+ SOURCES
+ rpc_forwarder_test.cpp
+ DEPENDS
+ logd
+ gtest
+)
+vespa_add_test(NAME logd_rpc_forwarder_test_app COMMAND logd_rpc_forwarder_test_app)
diff --git a/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp
new file mode 100644
index 00000000000..30ca5e19d44
--- /dev/null
+++ b/logd/src/tests/rpc_forwarder/rpc_forwarder_test.cpp
@@ -0,0 +1,248 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <logd/exceptions.h>
+#include <logd/metrics.h>
+#include <logd/rpc_forwarder.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/metrics/dummy_metrics_manager.h>
+
+using namespace logdemon;
+using vespalib::metrics::DummyMetricsManager;
+
+void
+encode_log_response(const ProtoConverter::ProtoLogResponse& src, FRT_Values& dst)
+{
+ auto buf = src.SerializeAsString();
+ dst.AddInt8(0);
+ dst.AddInt32(buf.size());
+ dst.AddData(buf.data(), buf.size());
+}
+
+bool
+decode_log_request(FRT_Values& src, ProtoConverter::ProtoLogRequest& dst)
+{
+ uint8_t encoding = src[0]._intval8;
+ assert(encoding == 0);
+ uint32_t uncompressed_size = src[1]._intval32;
+ assert(uncompressed_size == src[2]._data._len);
+ return dst.ParseFromArray(src[2]._data._buf, src[2]._data._len);
+}
+
+std::string garbage("garbage");
+
+struct RpcServer : public FRT_Invokable {
+ FRT_Supervisor supervisor;
+ int request_count;
+ std::vector<std::string> messages;
+ bool reply_with_error;
+ bool reply_with_proto_response;
+
+public:
+ RpcServer()
+ : supervisor(),
+ request_count(0),
+ messages(),
+ reply_with_error(false),
+ reply_with_proto_response(true)
+ {
+ supervisor.Listen(0);
+ FRT_ReflectionBuilder builder(&supervisor);
+ builder.DefineMethod("vespa.logserver.archiveLogMessages", "bix", "bix",
+ FRT_METHOD(RpcServer::rpc_archive_log_messages), this);
+ supervisor.Start();
+ }
+ ~RpcServer() {
+ supervisor.ShutDown(true);
+ }
+ int get_listen_port() {
+ return supervisor.GetListenPort();
+ }
+ void rpc_archive_log_messages(FRT_RPCRequest* request) {
+ ProtoConverter::ProtoLogRequest proto_request;
+ ASSERT_TRUE(decode_log_request(*request->GetParams(), proto_request));
+ ++request_count;
+ for (const auto& message : proto_request.log_messages()) {
+ messages.push_back(message.payload());
+ }
+ if (reply_with_error) {
+ request->SetError(123, "This is a server error");
+ return;
+ }
+ if (reply_with_proto_response) {
+ ProtoConverter::ProtoLogResponse proto_response;
+ encode_log_response(proto_response, *request->GetReturn());
+ } else {
+ auto& dst = *request->GetReturn();
+ dst.AddInt8(0);
+ dst.AddInt32(garbage.size());
+ dst.AddData(garbage.data(), garbage.size());
+ }
+ }
+};
+
+std::string
+make_log_line(const std::string& level, const std::string& payload)
+{
+ return "1234.5678\tmy_host\t10/20\tmy_service\tmy_component\t" + level + "\t" + payload;
+}
+
+struct MockMetricsManager : public DummyMetricsManager {
+ int add_count;
+ MockMetricsManager() : DummyMetricsManager(), add_count(0) {}
+ void add(Counter::Increment) override {
+ ++add_count;
+ }
+};
+
+class ClientSupervisor {
+private:
+ FRT_Supervisor _supervisor;
+public:
+ ClientSupervisor()
+ : _supervisor()
+ {
+ _supervisor.Start();
+ }
+ ~ClientSupervisor() {
+ _supervisor.ShutDown(true);
+ }
+ FRT_Supervisor& get() { return _supervisor; }
+
+};
+
+struct RpcForwarderTest : public ::testing::Test {
+ RpcServer server;
+ std::shared_ptr<MockMetricsManager> metrics_mgr;
+ Metrics metrics;
+ ClientSupervisor supervisor;
+ RpcForwarder forwarder;
+ RpcForwarderTest()
+ : server(),
+ metrics_mgr(std::make_shared<MockMetricsManager>()),
+ metrics(metrics_mgr),
+ forwarder(metrics, supervisor.get(), "localhost", server.get_listen_port(), 60.0, 3)
+ {
+ ForwardMap forward_filter;
+ forward_filter[ns_log::Logger::error] = true;
+ forward_filter[ns_log::Logger::warning] = false;
+ forward_filter[ns_log::Logger::info] = true;
+ // all other log levels are implicit false
+ forwarder.set_forward_filter(forward_filter);
+ }
+ void forward_line(const std::string& payload) {
+ forwarder.forwardLine(make_log_line("info", payload));
+ }
+ void forward_line(const std::string& level, const std::string& payload) {
+ forwarder.forwardLine(make_log_line(level, payload));
+ }
+ void forward_bad_line() {
+ forwarder.forwardLine("badline");
+ }
+ void flush() {
+ forwarder.flush();
+ }
+ void expect_messages() {
+ expect_messages(0, {});
+ }
+ void expect_messages(int exp_request_count, const std::vector<std::string>& exp_messages) {
+ EXPECT_EQ(exp_request_count, server.request_count);
+ EXPECT_EQ(exp_messages, server.messages);
+ }
+};
+
+TEST_F(RpcForwarderTest, does_not_send_rpc_with_no_log_messages)
+{
+ expect_messages();
+ flush();
+ expect_messages();
+}
+
+TEST_F(RpcForwarderTest, can_send_rpc_with_single_log_message)
+{
+ forward_line("a");
+ expect_messages();
+ flush();
+ expect_messages(1, {"a"});
+}
+
+TEST_F(RpcForwarderTest, can_send_rpc_with_multiple_log_messages)
+{
+ forward_line("a");
+ forward_line("b");
+ expect_messages();
+ flush();
+ expect_messages(1, {"a", "b"});
+}
+
+TEST_F(RpcForwarderTest, automatically_sends_rpc_when_max_messages_limit_is_reached)
+{
+ forward_line("a");
+ forward_line("b");
+ expect_messages();
+ forward_line("c");
+ expect_messages(1, {"a", "b", "c"});
+ forward_line("d");
+ expect_messages(1, {"a", "b", "c"});
+ forward_line("e");
+ expect_messages(1, {"a", "b", "c"});
+ forward_line("f");
+ expect_messages(2, {"a", "b", "c", "d", "e", "f"});
+}
+
+TEST_F(RpcForwarderTest, bad_log_lines_are_counted_but_not_sent)
+{
+ forward_line("a");
+ forward_bad_line();
+ EXPECT_EQ(1, forwarder.badLines());
+ flush();
+ expect_messages(1, {"a"});
+}
+
+TEST_F(RpcForwarderTest, bad_log_lines_count_can_be_reset)
+{
+ forward_bad_line();
+ EXPECT_EQ(1, forwarder.badLines());
+ forwarder.resetBadLines();
+ EXPECT_EQ(0, forwarder.badLines());
+}
+
+TEST_F(RpcForwarderTest, metrics_are_updated_for_each_log_message)
+{
+ forward_line("a");
+ EXPECT_EQ(1, metrics_mgr->add_count);
+ forward_line("b");
+ EXPECT_EQ(2, metrics_mgr->add_count);
+}
+
+TEST_F(RpcForwarderTest, log_messages_are_filtered_on_log_level)
+{
+ forward_line("fatal", "a");
+ forward_line("error", "b");
+ forward_line("warning", "c");
+ forward_line("config", "d");
+ forward_line("info", "e");
+ forward_line("event", "f");
+ forward_line("debug", "g");
+ forward_line("spam", "h");
+ forward_line("null", "i");
+ flush();
+ expect_messages(1, {"b", "e"});
+ EXPECT_EQ(9, metrics_mgr->add_count);
+}
+
+TEST_F(RpcForwarderTest, throws_when_rpc_reply_contains_errors)
+{
+ server.reply_with_error = true;
+ forward_line("a");
+ EXPECT_THROW(flush(), logdemon::ConnectionException);
+}
+
+TEST_F(RpcForwarderTest, throws_when_rpc_reply_does_not_contain_proto_response)
+{
+ server.reply_with_proto_response = false;
+ forward_line("a");
+ EXPECT_THROW(flush(), logdemon::DecodeException);
+}
+
+GTEST_MAIN_RUN_ALL_TESTS()
+