diff options
Diffstat (limited to 'config/src/tests/failover/failover.cpp')
-rw-r--r-- | config/src/tests/failover/failover.cpp | 356 |
1 files changed, 356 insertions, 0 deletions
diff --git a/config/src/tests/failover/failover.cpp b/config/src/tests/failover/failover.cpp new file mode 100644 index 00000000000..baaae8ba19a --- /dev/null +++ b/config/src/tests/failover/failover.cpp @@ -0,0 +1,356 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("failover"); +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/config/common/misc.h> +#include <vespa/config/frt/protocol.h> +#include <vespa/config/config.h> +#include <vespa/fnet/frt/frt.h> +#include "config-my.h" +#include <vespa/vespalib/data/slime/slime.h> + +using namespace config; +using vespalib::Barrier; +using namespace config::protocol::v2; +using namespace vespalib::slime; +using namespace vespalib; + +namespace { + +int get_port(const vespalib::string &spec) { + const char *port = (spec.data() + spec.size()); + while ((port > spec.data()) && (port[-1] >= '0') && (port[-1] <= '9')) { + --port; + } + return atoi(port); +} + +const vespalib::string requestTypes = "s"; +const vespalib::string responseTypes = "sx"; + +struct RPCServer : public FRT_Invokable { + FRT_Supervisor * supervisor; + vespalib::Barrier barrier; + int64_t gen; + + RPCServer() : supervisor(NULL), barrier(2), gen(1) { } + + void init(FRT_Supervisor * s) { + FRT_ReflectionBuilder rb(s); + rb.DefineMethod("config.v3.getConfig", requestTypes.c_str(), responseTypes.c_str(), true, + FRT_METHOD(RPCServer::getConfig), this); + } + + void getConfig(FRT_RPCRequest * req) + { + Slime slime; + Cursor & root(slime.setObject()); + root.setLong(RESPONSE_VERSION, 3); + root.setString(RESPONSE_DEF_NAME, Memory(MyConfig::CONFIG_DEF_NAME)); + root.setString(RESPONSE_DEF_NAMESPACE, Memory(MyConfig::CONFIG_DEF_NAMESPACE)); + root.setString(RESPONSE_DEF_MD5, Memory(MyConfig::CONFIG_DEF_MD5)); + Cursor &info = root.setObject("compressionInfo"); + info.setString("compressionType", "UNCOMPRESSED"); + info.setString("uncompressedSize", "0"); + root.setString(RESPONSE_CONFIGID, "myId"); + root.setString(RESPONSE_CLIENT_HOSTNAME, "myhost"); + root.setString(RESPONSE_CONFIG_MD5, "md5"); + root.setLong(RESPONSE_CONFIG_GENERATION, gen); + root.setObject(RESPONSE_TRACE); + Slime payload; + payload.setObject().setString("myField", "myval"); + + FRT_Values & ret = *req->GetReturn(); + SimpleBuffer buf; + JsonFormat::encode(slime, buf, false); + ret.AddString(buf.get().make_string().c_str()); + + SimpleBuffer pbuf; + JsonFormat::encode(payload, pbuf, false); + vespalib::string d = pbuf.get().make_string(); + ret.AddData(d.c_str(), d.size()); + LOG(info, "Answering..."); + } + void wait() { + barrier.await(); + } + void reload() { gen++; } +}; + + +void verifyConfig(std::unique_ptr<MyConfig> config) +{ + ASSERT_TRUE(config.get() != NULL); + ASSERT_EQUAL("myval", config->myField); +} + +struct ServerFixture { + typedef vespalib::LinkedPtr<ServerFixture> LP; + FRT_Supervisor * supervisor; + RPCServer server; + Barrier b; + const vespalib::string listenSpec; + ServerFixture(const vespalib::string & ls) + : supervisor(NULL), + server(), + b(2), + listenSpec(ls) + { + } + + void wait() + { + b.await(); + } + + void start() + { + supervisor = new FRT_Supervisor(); + server.init(supervisor); + supervisor->Listen(get_port(listenSpec)); + wait(); // Wait until test runner signals we can start + supervisor->Main(); + wait(); // Signalling that we have shut down + wait(); // Wait for signal saying that supervisor is deleted + } + + void stop() + { + if (supervisor != NULL) { + supervisor->ShutDown(true); + wait(); // Wait for supervisor to shut down + delete supervisor; + supervisor = NULL; + wait(); // Signal that we are done and start can return. + } + } + + ~ServerFixture() { stop(); } +}; + +struct NetworkFixture { + std::vector<ServerFixture::LP> serverList; + ServerSpec spec; + bool running; + NetworkFixture(const std::vector<vespalib::string> & serverSpecs) + : spec(serverSpecs), running(true) + { + for (size_t i = 0; i < serverSpecs.size(); i++) { + serverList.push_back(ServerFixture::LP(new ServerFixture(serverSpecs[i]))); + } + } + void start(size_t i) { + serverList[i]->start(); + } + void wait(size_t i) { + serverList[i]->wait(); + } + void waitAll() { + for (size_t i = 0; i < serverList.size(); i++) { + serverList[i]->wait(); + } + } + void run(size_t i) { + while (running) { + serverList[i]->start(); + } + } + void stopAll() { + running = false; + for (size_t i = 0; i < serverList.size(); i++) { + serverList[i]->stop(); + } + } + void stop(size_t i) { + serverList[i]->stop(); + } + void reload() { + for (size_t i = 0; i < serverList.size(); i++) { + serverList[i]->server.reload(); + } + } +}; + + +TimingValues testTimingValues( + 500, // successTimeout + 500, // errorTimeout + 500, // initialTimeout + 400, // unsubscribeTimeout + 0, // fixedDelay + 250, // successDelay + 250, // unconfiguredDelay + 500, // configuredErrorDelay + 1, // maxDelayMultiplier + 600, // transientDelay + 1200); // fatalDelay + +struct ConfigCheckFixture { + IConfigContext::SP ctx; + NetworkFixture & nf; + + ConfigCheckFixture(NetworkFixture & f2) + : ctx(new ConfigContext(testTimingValues, f2.spec)), + nf(f2) + { + } + void checkSubscribe() + { + ConfigSubscriber s(ctx); + ConfigHandle<MyConfig>::UP handle = s.subscribe<MyConfig>("myId"); + ASSERT_TRUE(s.nextConfig()); + } + void verifySubscribeFailover(size_t index) + { + nf.stop(index); + checkSubscribe(); + nf.wait(index); + } + + void verifySubscribeFailover(size_t indexA, size_t indexB) + { + nf.stop(indexA); + nf.stop(indexB); + checkSubscribe(); + nf.wait(indexA); + nf.wait(indexB); + } +}; + +struct ConfigReloadFixture { + IConfigContext::SP ctx; + NetworkFixture & nf; + ConfigSubscriber s; + ConfigHandle<MyConfig>::UP handle; + + ConfigReloadFixture(NetworkFixture & f2) + : ctx(new ConfigContext(testTimingValues, f2.spec)), + nf(f2), + s(ctx), + handle(s.subscribe<MyConfig>("myId")) + { + } + + void verifyReload() + { + nf.reload(); + ASSERT_TRUE(s.nextGeneration()); + verifyConfig(handle->getConfig()); + } + + void verifyReloadFailover(size_t index) + { + nf.stop(index); + verifyReload(); + nf.wait(index); + } + + void verifyReloadFailover(size_t indexA, size_t indexB) + { + nf.stop(indexA); + nf.stop(indexB); + verifyReload(); + nf.wait(indexA); + nf.wait(indexB); + } +}; + +struct ThreeServersFixture { + std::vector<vespalib::string> specs; + ThreeServersFixture() : specs() { + specs.push_back("tcp/localhost:18590"); + specs.push_back("tcp/localhost:18592"); + specs.push_back("tcp/localhost:18594"); + } +}; + +struct OneServerFixture { + std::vector<vespalib::string> specs; + OneServerFixture() : specs() { + specs.push_back("tcp/localhost:18590"); + } +}; + +} + +TEST_MT_FF("require that any node can be down when subscribing", + 4, + ThreeServersFixture(), + NetworkFixture(f1.specs)) +{ + if (thread_id == 0) { + ConfigCheckFixture ccf(f2); + f2.waitAll(); + ccf.checkSubscribe(); + ccf.verifySubscribeFailover(0); + ccf.verifySubscribeFailover(1); + ccf.verifySubscribeFailover(2); + f2.stopAll(); + TEST_BARRIER(); + } else { + f2.run(thread_id - 1); + TEST_BARRIER(); + } +} +/* +TEST_MT_FF("require that two out of three nodes can be down when subscribing", + 4, + ThreeServersFixture(), + NetworkFixture(f1.specs)) +{ + if (thread_id == 0) { + ConfigCheckFixture ccf(f2); + f2.waitAll(); + ccf.checkSubscribe(); + ccf.verifySubscribeFailover(0, 1); + ccf.verifySubscribeFailover(1, 2); + ccf.verifySubscribeFailover(0, 2); + f2.stopAll(); + TEST_BARRIER(); + } else { + f2.run(thread_id - 1); + TEST_BARRIER(); + } +} + +TEST_MT_FF("require that any node can be down when waiting for next generation", + 4, + ThreeServersFixture(), + NetworkFixture(f1.specs)) +{ + if (thread_id == 0) { + f2.waitAll(); + ConfigReloadFixture crf(f2); + crf.verifyReload(); + crf.verifyReloadFailover(0); + crf.verifyReloadFailover(1); + crf.verifyReloadFailover(2); + f2.stopAll(); + TEST_BARRIER(); + } else { f2.run(thread_id - 1); TEST_BARRIER(); + } +} + +TEST_MT_FF("require that two out of three nodes can be down when waiting for next generation", + 4, + ThreeServersFixture(), + NetworkFixture(f1.specs)) +{ + if (thread_id == 0) { + f2.waitAll(); + ConfigReloadFixture crf(f2); + crf.verifyReload(); + crf.verifyReloadFailover(0, 1); + crf.verifyReloadFailover(1, 2); + crf.verifyReloadFailover(0, 2); + f2.stopAll(); + TEST_BARRIER(); + } else { + f2.run(thread_id - 1); + TEST_BARRIER(); + } +} +*/ + +TEST_MAIN() { TEST_RUN_ALL(); } |