summaryrefslogtreecommitdiffstats
path: root/config/src/tests/failover/failover.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'config/src/tests/failover/failover.cpp')
-rw-r--r--config/src/tests/failover/failover.cpp356
1 files changed, 356 insertions, 0 deletions
diff --git a/config/src/tests/failover/failover.cpp b/config/src/tests/failover/failover.cpp
new file mode 100644
index 00000000000..baaae8ba19a
--- /dev/null
+++ b/config/src/tests/failover/failover.cpp
@@ -0,0 +1,356 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP("failover");
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/config/common/misc.h>
+#include <vespa/config/frt/protocol.h>
+#include <vespa/config/config.h>
+#include <vespa/fnet/frt/frt.h>
+#include "config-my.h"
+#include <vespa/vespalib/data/slime/slime.h>
+
+using namespace config;
+using vespalib::Barrier;
+using namespace config::protocol::v2;
+using namespace vespalib::slime;
+using namespace vespalib;
+
+namespace {
+
+int get_port(const vespalib::string &spec) {
+ const char *port = (spec.data() + spec.size());
+ while ((port > spec.data()) && (port[-1] >= '0') && (port[-1] <= '9')) {
+ --port;
+ }
+ return atoi(port);
+}
+
+const vespalib::string requestTypes = "s";
+const vespalib::string responseTypes = "sx";
+
+struct RPCServer : public FRT_Invokable {
+ FRT_Supervisor * supervisor;
+ vespalib::Barrier barrier;
+ int64_t gen;
+
+ RPCServer() : supervisor(NULL), barrier(2), gen(1) { }
+
+ void init(FRT_Supervisor * s) {
+ FRT_ReflectionBuilder rb(s);
+ rb.DefineMethod("config.v3.getConfig", requestTypes.c_str(), responseTypes.c_str(), true,
+ FRT_METHOD(RPCServer::getConfig), this);
+ }
+
+ void getConfig(FRT_RPCRequest * req)
+ {
+ Slime slime;
+ Cursor & root(slime.setObject());
+ root.setLong(RESPONSE_VERSION, 3);
+ root.setString(RESPONSE_DEF_NAME, Memory(MyConfig::CONFIG_DEF_NAME));
+ root.setString(RESPONSE_DEF_NAMESPACE, Memory(MyConfig::CONFIG_DEF_NAMESPACE));
+ root.setString(RESPONSE_DEF_MD5, Memory(MyConfig::CONFIG_DEF_MD5));
+ Cursor &info = root.setObject("compressionInfo");
+ info.setString("compressionType", "UNCOMPRESSED");
+ info.setString("uncompressedSize", "0");
+ root.setString(RESPONSE_CONFIGID, "myId");
+ root.setString(RESPONSE_CLIENT_HOSTNAME, "myhost");
+ root.setString(RESPONSE_CONFIG_MD5, "md5");
+ root.setLong(RESPONSE_CONFIG_GENERATION, gen);
+ root.setObject(RESPONSE_TRACE);
+ Slime payload;
+ payload.setObject().setString("myField", "myval");
+
+ FRT_Values & ret = *req->GetReturn();
+ SimpleBuffer buf;
+ JsonFormat::encode(slime, buf, false);
+ ret.AddString(buf.get().make_string().c_str());
+
+ SimpleBuffer pbuf;
+ JsonFormat::encode(payload, pbuf, false);
+ vespalib::string d = pbuf.get().make_string();
+ ret.AddData(d.c_str(), d.size());
+ LOG(info, "Answering...");
+ }
+ void wait() {
+ barrier.await();
+ }
+ void reload() { gen++; }
+};
+
+
+void verifyConfig(std::unique_ptr<MyConfig> config)
+{
+ ASSERT_TRUE(config.get() != NULL);
+ ASSERT_EQUAL("myval", config->myField);
+}
+
+struct ServerFixture {
+ typedef vespalib::LinkedPtr<ServerFixture> LP;
+ FRT_Supervisor * supervisor;
+ RPCServer server;
+ Barrier b;
+ const vespalib::string listenSpec;
+ ServerFixture(const vespalib::string & ls)
+ : supervisor(NULL),
+ server(),
+ b(2),
+ listenSpec(ls)
+ {
+ }
+
+ void wait()
+ {
+ b.await();
+ }
+
+ void start()
+ {
+ supervisor = new FRT_Supervisor();
+ server.init(supervisor);
+ supervisor->Listen(get_port(listenSpec));
+ wait(); // Wait until test runner signals we can start
+ supervisor->Main();
+ wait(); // Signalling that we have shut down
+ wait(); // Wait for signal saying that supervisor is deleted
+ }
+
+ void stop()
+ {
+ if (supervisor != NULL) {
+ supervisor->ShutDown(true);
+ wait(); // Wait for supervisor to shut down
+ delete supervisor;
+ supervisor = NULL;
+ wait(); // Signal that we are done and start can return.
+ }
+ }
+
+ ~ServerFixture() { stop(); }
+};
+
+struct NetworkFixture {
+ std::vector<ServerFixture::LP> serverList;
+ ServerSpec spec;
+ bool running;
+ NetworkFixture(const std::vector<vespalib::string> & serverSpecs)
+ : spec(serverSpecs), running(true)
+ {
+ for (size_t i = 0; i < serverSpecs.size(); i++) {
+ serverList.push_back(ServerFixture::LP(new ServerFixture(serverSpecs[i])));
+ }
+ }
+ void start(size_t i) {
+ serverList[i]->start();
+ }
+ void wait(size_t i) {
+ serverList[i]->wait();
+ }
+ void waitAll() {
+ for (size_t i = 0; i < serverList.size(); i++) {
+ serverList[i]->wait();
+ }
+ }
+ void run(size_t i) {
+ while (running) {
+ serverList[i]->start();
+ }
+ }
+ void stopAll() {
+ running = false;
+ for (size_t i = 0; i < serverList.size(); i++) {
+ serverList[i]->stop();
+ }
+ }
+ void stop(size_t i) {
+ serverList[i]->stop();
+ }
+ void reload() {
+ for (size_t i = 0; i < serverList.size(); i++) {
+ serverList[i]->server.reload();
+ }
+ }
+};
+
+
+TimingValues testTimingValues(
+ 500, // successTimeout
+ 500, // errorTimeout
+ 500, // initialTimeout
+ 400, // unsubscribeTimeout
+ 0, // fixedDelay
+ 250, // successDelay
+ 250, // unconfiguredDelay
+ 500, // configuredErrorDelay
+ 1, // maxDelayMultiplier
+ 600, // transientDelay
+ 1200); // fatalDelay
+
+struct ConfigCheckFixture {
+ IConfigContext::SP ctx;
+ NetworkFixture & nf;
+
+ ConfigCheckFixture(NetworkFixture & f2)
+ : ctx(new ConfigContext(testTimingValues, f2.spec)),
+ nf(f2)
+ {
+ }
+ void checkSubscribe()
+ {
+ ConfigSubscriber s(ctx);
+ ConfigHandle<MyConfig>::UP handle = s.subscribe<MyConfig>("myId");
+ ASSERT_TRUE(s.nextConfig());
+ }
+ void verifySubscribeFailover(size_t index)
+ {
+ nf.stop(index);
+ checkSubscribe();
+ nf.wait(index);
+ }
+
+ void verifySubscribeFailover(size_t indexA, size_t indexB)
+ {
+ nf.stop(indexA);
+ nf.stop(indexB);
+ checkSubscribe();
+ nf.wait(indexA);
+ nf.wait(indexB);
+ }
+};
+
+struct ConfigReloadFixture {
+ IConfigContext::SP ctx;
+ NetworkFixture & nf;
+ ConfigSubscriber s;
+ ConfigHandle<MyConfig>::UP handle;
+
+ ConfigReloadFixture(NetworkFixture & f2)
+ : ctx(new ConfigContext(testTimingValues, f2.spec)),
+ nf(f2),
+ s(ctx),
+ handle(s.subscribe<MyConfig>("myId"))
+ {
+ }
+
+ void verifyReload()
+ {
+ nf.reload();
+ ASSERT_TRUE(s.nextGeneration());
+ verifyConfig(handle->getConfig());
+ }
+
+ void verifyReloadFailover(size_t index)
+ {
+ nf.stop(index);
+ verifyReload();
+ nf.wait(index);
+ }
+
+ void verifyReloadFailover(size_t indexA, size_t indexB)
+ {
+ nf.stop(indexA);
+ nf.stop(indexB);
+ verifyReload();
+ nf.wait(indexA);
+ nf.wait(indexB);
+ }
+};
+
+struct ThreeServersFixture {
+ std::vector<vespalib::string> specs;
+ ThreeServersFixture() : specs() {
+ specs.push_back("tcp/localhost:18590");
+ specs.push_back("tcp/localhost:18592");
+ specs.push_back("tcp/localhost:18594");
+ }
+};
+
+struct OneServerFixture {
+ std::vector<vespalib::string> specs;
+ OneServerFixture() : specs() {
+ specs.push_back("tcp/localhost:18590");
+ }
+};
+
+}
+
+TEST_MT_FF("require that any node can be down when subscribing",
+ 4,
+ ThreeServersFixture(),
+ NetworkFixture(f1.specs))
+{
+ if (thread_id == 0) {
+ ConfigCheckFixture ccf(f2);
+ f2.waitAll();
+ ccf.checkSubscribe();
+ ccf.verifySubscribeFailover(0);
+ ccf.verifySubscribeFailover(1);
+ ccf.verifySubscribeFailover(2);
+ f2.stopAll();
+ TEST_BARRIER();
+ } else {
+ f2.run(thread_id - 1);
+ TEST_BARRIER();
+ }
+}
+/*
+TEST_MT_FF("require that two out of three nodes can be down when subscribing",
+ 4,
+ ThreeServersFixture(),
+ NetworkFixture(f1.specs))
+{
+ if (thread_id == 0) {
+ ConfigCheckFixture ccf(f2);
+ f2.waitAll();
+ ccf.checkSubscribe();
+ ccf.verifySubscribeFailover(0, 1);
+ ccf.verifySubscribeFailover(1, 2);
+ ccf.verifySubscribeFailover(0, 2);
+ f2.stopAll();
+ TEST_BARRIER();
+ } else {
+ f2.run(thread_id - 1);
+ TEST_BARRIER();
+ }
+}
+
+TEST_MT_FF("require that any node can be down when waiting for next generation",
+ 4,
+ ThreeServersFixture(),
+ NetworkFixture(f1.specs))
+{
+ if (thread_id == 0) {
+ f2.waitAll();
+ ConfigReloadFixture crf(f2);
+ crf.verifyReload();
+ crf.verifyReloadFailover(0);
+ crf.verifyReloadFailover(1);
+ crf.verifyReloadFailover(2);
+ f2.stopAll();
+ TEST_BARRIER();
+ } else { f2.run(thread_id - 1); TEST_BARRIER();
+ }
+}
+
+TEST_MT_FF("require that two out of three nodes can be down when waiting for next generation",
+ 4,
+ ThreeServersFixture(),
+ NetworkFixture(f1.specs))
+{
+ if (thread_id == 0) {
+ f2.waitAll();
+ ConfigReloadFixture crf(f2);
+ crf.verifyReload();
+ crf.verifyReloadFailover(0, 1);
+ crf.verifyReloadFailover(1, 2);
+ crf.verifyReloadFailover(0, 2);
+ f2.stopAll();
+ TEST_BARRIER();
+ } else {
+ f2.run(thread_id - 1);
+ TEST_BARRIER();
+ }
+}
+*/
+
+TEST_MAIN() { TEST_RUN_ALL(); }