summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fnet/src/vespa/fnet/transport_debugger.cpp4
-rw-r--r--fnet/src/vespa/fnet/transport_debugger.h18
-rw-r--r--slobrok/CMakeLists.txt1
-rw-r--r--slobrok/src/tests/rpc_mapping_monitor/CMakeLists.txt9
-rw-r--r--slobrok/src/tests/rpc_mapping_monitor/rpc_mapping_monitor_test.cpp224
-rw-r--r--slobrok/src/vespa/slobrok/server/mapping_monitor.h2
6 files changed, 250 insertions, 8 deletions
diff --git a/fnet/src/vespa/fnet/transport_debugger.cpp b/fnet/src/vespa/fnet/transport_debugger.cpp
index 1878b921171..32179aba254 100644
--- a/fnet/src/vespa/fnet/transport_debugger.cpp
+++ b/fnet/src/vespa/fnet/transport_debugger.cpp
@@ -50,10 +50,10 @@ TransportDebugger::attach(std::initializer_list<std::reference_wrapper<FNET_Tran
}
void
-TransportDebugger::step()
+TransportDebugger::step(vespalib::duration time_passed)
{
REQUIRE(_meet);
- _time += 5ms; // pretend 5ms passes between each event loop iteration
+ _time += time_passed; // pretend time passes between each event loop iteration
REQUIRE(_meet->rendezvous(true)); // release transport threads
REQUIRE(_meet->rendezvous(true)); // capture transport threads
}
diff --git a/fnet/src/vespa/fnet/transport_debugger.h b/fnet/src/vespa/fnet/transport_debugger.h
index ed3738bb9fe..30b0c4dcb1c 100644
--- a/fnet/src/vespa/fnet/transport_debugger.h
+++ b/fnet/src/vespa/fnet/transport_debugger.h
@@ -21,10 +21,10 @@ namespace fnet {
* is used to start controlling event loop execution. While attached,
* calling the step function will run each transport thread event loop
* exactly once (in parallel), wait for pending dns resolving, wait
- * for pending tls handshake work and advance the current time with
- * 5ms (making sure 'time passes' and 'stuff happens' at a reasonable
- * relative rate). It is important to call detach to release the
- * transports before trying to shut them down.
+ * for pending tls handshake work and advance the current time (the
+ * default 5ms will make sure 'time passes' and 'stuff happens' at a
+ * reasonable relative rate). It is important to call detach to
+ * release the transports before trying to shut them down.
*
* Note that both server and client should be controlled by the same
* debugger when testing rpc. Using external services will result in
@@ -53,7 +53,15 @@ public:
return TimeTools::make_debug(vespalib::duration::zero(), [this]() noexcept { return time(); });
}
void attach(std::initializer_list<std::reference_wrapper<FNET_Transport> > list);
- void step();
+ void step(vespalib::duration time_passed = 5ms);
+ template <typename Pred>
+ bool step_until(Pred pred, vespalib::duration time_limit = 120s) {
+ auto start = time();
+ while (!pred() && ((time() - start) < time_limit)) {
+ step();
+ }
+ return pred();
+ }
void detach();
};
diff --git a/slobrok/CMakeLists.txt b/slobrok/CMakeLists.txt
index c6c6313cf68..92fc393418f 100644
--- a/slobrok/CMakeLists.txt
+++ b/slobrok/CMakeLists.txt
@@ -22,6 +22,7 @@ vespa_define_module(
src/tests/local_rpc_monitor_map
src/tests/mirrorapi
src/tests/registerapi
+ src/tests/rpc_mapping_monitor
src/tests/service_map_history
src/tests/service_map_mirror
src/tests/standalone
diff --git a/slobrok/src/tests/rpc_mapping_monitor/CMakeLists.txt b/slobrok/src/tests/rpc_mapping_monitor/CMakeLists.txt
new file mode 100644
index 00000000000..a5de3338309
--- /dev/null
+++ b/slobrok/src/tests/rpc_mapping_monitor/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(slobrok_rpc_mapping_monitor_test_app TEST
+ SOURCES
+ rpc_mapping_monitor_test.cpp
+ DEPENDS
+ slobrok_slobrokserver
+ GTest::GTest
+)
+vespa_add_test(NAME slobrok_rpc_mapping_monitor_test_app COMMAND slobrok_rpc_mapping_monitor_test_app)
diff --git a/slobrok/src/tests/rpc_mapping_monitor/rpc_mapping_monitor_test.cpp b/slobrok/src/tests/rpc_mapping_monitor/rpc_mapping_monitor_test.cpp
new file mode 100644
index 00000000000..f503453f934
--- /dev/null
+++ b/slobrok/src/tests/rpc_mapping_monitor/rpc_mapping_monitor_test.cpp
@@ -0,0 +1,224 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/slobrok/server/rpc_mapping_monitor.h>
+#include <vespa/fnet/transport_debugger.h>
+#include <vespa/fnet/transport_thread.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/vespalib/util/require.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <map>
+
+using namespace vespalib;
+using namespace slobrok;
+using vespalib::make_string_short::fmt;
+
+// simple rpc server implementing the required slobrok call-back API
+struct Server : FRT_Invokable {
+ fnet::frt::StandaloneFRT frt;
+ std::vector<vespalib::string> names;
+ size_t inject_fail_cnt;
+ FNET_Connection *last_conn;
+ void set_last_conn(FNET_Connection *conn) {
+ if (last_conn) {
+ last_conn->SubRef();
+ }
+ last_conn = conn;
+ if (last_conn) {
+ last_conn->AddRef();
+ }
+ }
+ Server(fnet::TimeTools::SP time_tools) : frt(TransportConfig().time_tools(time_tools)), names(),
+ inject_fail_cnt(0), last_conn(nullptr)
+ {
+ FRT_ReflectionBuilder rb(&frt.supervisor());
+ rb.DefineMethod("slobrok.callback.listNamesServed", "", "S", FRT_METHOD(Server::rpc_listNamesServed), this);
+ rb.DefineMethod("slobrok.callback.notifyUnregistered", "s", "", FRT_METHOD(Server::rpc_notifyUnregistered), this);
+ REQUIRE(frt.supervisor().Listen(0));
+ }
+ ~Server() { set_last_conn(nullptr); }
+ vespalib::string spec() const { return fmt("tcp/%d", frt.supervisor().GetListenPort()); }
+ FNET_Transport &transport() { return *frt.supervisor().GetTransport(); }
+ void rpc_listNamesServed(FRT_RPCRequest *req) {
+ set_last_conn(req->GetConnection());
+ if (inject_fail_cnt > 0) {
+ req->SetError(FRTE_RPC_METHOD_FAILED, "fail injected by unit test");
+ --inject_fail_cnt;
+ } else {
+ FRT_Values &dst = *req->GetReturn();
+ FRT_StringValue *names_out = dst.AddStringArray(names.size());
+ for (size_t i = 0; i < names.size(); ++i) {
+ dst.SetString(&names_out[i], names[i].c_str());
+ }
+ }
+ }
+ void rpc_notifyUnregistered(FRT_RPCRequest *) {}
+};
+
+enum class State { ANY, UP, DOWN };
+
+// Run-Length-Encoded historic state samples for a single service mapping
+struct States {
+ struct Entry {
+ State state;
+ size_t cnt;
+ };
+ std::vector<Entry> hist;
+ State state() const { return hist.back().state; }
+ States() : hist({{State::ANY, 0}}) {}
+ void sample(State state) {
+ if (state == hist.back().state) {
+ ++hist.back().cnt;
+ } else {
+ hist.push_back(Entry{state, 1});
+ }
+ }
+ size_t samples(State state = State::ANY) const {
+ size_t n = 0;
+ for (const auto &entry: hist) {
+ if ((entry.state == state) || (state == State::ANY)) {
+ n += entry.cnt;
+ }
+ }
+ return n;
+ }
+};
+
+// history of which call-backs we have gotten so far
+struct History : MappingMonitorOwner {
+ std::map<ServiceMapping, States> map;
+ void up(const ServiceMapping &mapping) override { map[mapping].sample(State::UP); }
+ void down(const ServiceMapping &mapping) override { map[mapping].sample(State::DOWN); }
+};
+
+struct RpcMappingMonitorTest : public ::testing::Test {
+ fnet::TransportDebugger debugger;
+ fnet::frt::StandaloneFRT my_frt;
+ Server a;
+ Server b;
+ History hist;
+ std::unique_ptr<RpcMappingMonitor> monitor;
+ ServiceMapping foo_a;
+ ServiceMapping bar_a;
+ ServiceMapping baz_b;
+ RpcMappingMonitorTest()
+ : debugger(),
+ my_frt(TransportConfig().time_tools(debugger.time_tools())),
+ a(debugger.time_tools()),
+ b(debugger.time_tools()),
+ hist(),
+ monitor(),
+ foo_a("foo", a.spec()),
+ bar_a("bar", a.spec()),
+ baz_b("baz", b.spec())
+ {
+ debugger.attach({*my_frt.supervisor().GetTransport(), a.transport(), b.transport()});
+ monitor = std::make_unique<RpcMappingMonitor>(my_frt.supervisor(), hist);
+ a.names.push_back(foo_a.name);
+ a.names.push_back(bar_a.name);
+ b.names.push_back(baz_b.name);
+ }
+ ~RpcMappingMonitorTest() {
+ monitor.reset();
+ debugger.detach();
+ }
+};
+
+TEST_F(RpcMappingMonitorTest, services_can_be_monitored) {
+ monitor->start(foo_a, false);
+ monitor->start(bar_a, false);
+ monitor->start(baz_b, false);
+ EXPECT_TRUE(debugger.step_until([&]() {
+ return ((hist.map[foo_a].samples() >= 3) &&
+ (hist.map[bar_a].samples() >= 3) &&
+ (hist.map[baz_b].samples() >= 3)); }));
+ EXPECT_EQ(hist.map[foo_a].samples(State::DOWN), 0);
+ EXPECT_EQ(hist.map[bar_a].samples(State::DOWN), 0);
+ EXPECT_EQ(hist.map[baz_b].samples(State::DOWN), 0);
+}
+
+TEST_F(RpcMappingMonitorTest, hurry_means_faster) {
+ monitor->start(foo_a, false);
+ monitor->start(baz_b, true);
+ auto t0 = debugger.time();
+ EXPECT_TRUE(debugger.step_until([&]() {
+ return ((hist.map[baz_b].samples() > 0)); }));
+ EXPECT_EQ(hist.map[foo_a].samples(), 0);
+ auto t1 = debugger.time();
+ EXPECT_TRUE(debugger.step_until([&]() {
+ return ((hist.map[foo_a].samples() > 0)); }));
+ auto t2 = debugger.time();
+ fprintf(stderr, "hurry: ~%zu ms, normal: ~%zu ms\n", count_ms(t1-t0), count_ms(t2-t0));
+ EXPECT_GT((t2 - t0), 10 * (t1 - t0));
+ EXPECT_EQ(hist.map[foo_a].state(), State::UP);
+ EXPECT_EQ(hist.map[baz_b].state(), State::UP);
+}
+
+TEST_F(RpcMappingMonitorTest, stop_means_stop) {
+ monitor->start(foo_a, false);
+ monitor->start(baz_b, true);
+ EXPECT_TRUE(debugger.step_until([&]() {
+ return ((hist.map[baz_b].samples() == 1)); }));
+ monitor->stop(baz_b);
+ EXPECT_TRUE(debugger.step_until([&]() {
+ return ((hist.map[foo_a].samples() == 3)); }));
+ EXPECT_EQ(hist.map[baz_b].samples(), 1);
+ EXPECT_EQ(hist.map[foo_a].state(), State::UP);
+ EXPECT_EQ(hist.map[baz_b].state(), State::UP);
+}
+
+TEST_F(RpcMappingMonitorTest, health_checks_may_fail) {
+ ServiceMapping bad_spec("foo", "this spec is invalid");
+ ServiceMapping failed_ping("foo", a.spec());
+ ServiceMapping missing_name("foo", b.spec());
+ a.inject_fail_cnt = 2;
+ monitor->start(bad_spec, true);
+ monitor->start(failed_ping, true);
+ monitor->start(missing_name, true);
+ EXPECT_TRUE(debugger.step_until([&]() {
+ return (hist.map[failed_ping].state() == State::UP); }));
+ EXPECT_EQ(hist.map[bad_spec].state(), State::DOWN);
+ EXPECT_EQ(hist.map[missing_name].state(), State::DOWN);
+ EXPECT_EQ(hist.map[failed_ping].samples(State::DOWN), 2);
+ EXPECT_EQ(hist.map[bad_spec].samples(State::UP), 0);
+ EXPECT_EQ(hist.map[missing_name].samples(State::UP), 0);
+}
+
+TEST_F(RpcMappingMonitorTest, loss_of_idle_connection_is_detected_and_recovered) {
+ monitor->start(foo_a, true);
+ EXPECT_TRUE(debugger.step_until([&]() {
+ return (hist.map[foo_a].state() == State::UP); }));
+ ASSERT_TRUE(a.last_conn);
+ a.last_conn->Owner()->Close(a.last_conn);
+ a.set_last_conn(nullptr);
+ EXPECT_TRUE(debugger.step_until([&]() {
+ return (hist.map[foo_a].state() == State::DOWN); }));
+ // down without new rpc check, will re-connect and come back up
+ EXPECT_FALSE(a.last_conn);
+ EXPECT_TRUE(debugger.step_until([&]() {
+ return (hist.map[foo_a].state() == State::UP); }));
+ EXPECT_EQ(hist.map[foo_a].samples(State::DOWN), 1);
+}
+
+TEST_F(RpcMappingMonitorTest, up_connection_is_reused) {
+ monitor->start(foo_a, true);
+ EXPECT_TRUE(debugger.step_until([&]() { return (a.last_conn); }));
+ auto my_conn = a.last_conn;
+ a.last_conn = nullptr;
+ EXPECT_TRUE(debugger.step_until([&]() { return (a.last_conn); }));
+ EXPECT_EQ(a.last_conn, my_conn);
+ my_conn->SubRef();
+ EXPECT_EQ(hist.map[foo_a].state(), State::UP);
+}
+
+TEST_F(RpcMappingMonitorTest, detect_ping_interval) {
+ monitor->start(foo_a, true);
+ EXPECT_TRUE(debugger.step_until([&]() { return (a.last_conn); }));
+ auto t1 = debugger.time();
+ a.set_last_conn(nullptr);
+ EXPECT_TRUE(debugger.step_until([&]() { return (a.last_conn); }));
+ auto t2 = debugger.time();
+ fprintf(stderr, "ping interval: ~%zu ms\n", count_ms(t2-t1));
+}
+
+GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/slobrok/src/vespa/slobrok/server/mapping_monitor.h b/slobrok/src/vespa/slobrok/server/mapping_monitor.h
index 4ac89e7521a..1a83b32f87f 100644
--- a/slobrok/src/vespa/slobrok/server/mapping_monitor.h
+++ b/slobrok/src/vespa/slobrok/server/mapping_monitor.h
@@ -12,7 +12,7 @@ struct MappingMonitorOwner {
virtual void up(const ServiceMapping& mapping) = 0;
virtual void down(const ServiceMapping& mapping) = 0;
protected:
- ~MappingMonitorOwner() = default;
+ virtual ~MappingMonitorOwner() = default;
};
struct MappingMonitor {