diff options
Diffstat (limited to 'messagebus_test/src/tests/trace/trace.cpp')
-rw-r--r-- | messagebus_test/src/tests/trace/trace.cpp | 113 |
1 files changed, 113 insertions, 0 deletions
diff --git a/messagebus_test/src/tests/trace/trace.cpp b/messagebus_test/src/tests/trace/trace.cpp new file mode 100644 index 00000000000..94550460c84 --- /dev/null +++ b/messagebus_test/src/tests/trace/trace.cpp @@ -0,0 +1,113 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("trace_test"); +#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/messagebus/emptyreply.h> +#include <vespa/messagebus/messagebus.h> +#include <vespa/messagebus/sourcesession.h> +#include <vespa/messagebus/rpcmessagebus.h> +#include <vespa/messagebus/intermediatesession.h> +#include <vespa/messagebus/destinationsession.h> +#include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/messagebus/testlib/testserver.h> +#include <vespa/messagebus/routing/routingspec.h> +#include <vespa/messagebus/testlib/receptor.h> +#include <vespa/messagebus/sourcesessionparams.h> +#include <vespa/messagebus/testlib/simplemessage.h> +#include <vespa/messagebus/testlib/simplereply.h> +#include <vespa/messagebus/testlib/simpleprotocol.h> +#include <iostream> + +using namespace mbus; +using vespalib::make_string; + +TEST_SETUP(Test); + +bool +waitSlobrok(RPCMessageBus &mbus, const std::string &pattern) +{ + for (int i = 0; i < 30000; i++) { + slobrok::api::MirrorAPI::SpecList res = mbus.getRPCNetwork().getMirror().lookup(pattern); + if (res.size() > 0) { + return true; + } + FastOS_Thread::Sleep(10); + } + return false; +} + +int +Test::Main() +{ + TEST_INIT("trace_test"); + Slobrok slobrok; + { // Make slobrok config + EXPECT_TRUE(system("echo slobrok[1] > slobrok.cfg") == 0); + EXPECT_TRUE(system(make_string("echo 'slobrok[0].connectionspec tcp/localhost:%d' " + ">> slobrok.cfg", slobrok.port()).c_str()) == 0); + } + EXPECT_TRUE(system("sh ctl.sh start all") == 0); + RPCMessageBus mb(ProtocolSet().add(IProtocol::SP(new SimpleProtocol())), + RPCNetworkParams().setSlobrokConfig("file:slobrok.cfg"), + "file:routing.cfg"); + EXPECT_TRUE(waitSlobrok(mb, "server/cpp/1/A/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/cpp/2/A/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/cpp/2/B/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/cpp/3/A/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/cpp/3/B/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/cpp/3/C/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/cpp/3/D/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/java/1/A/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/java/2/A/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/java/2/B/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/java/3/A/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/java/3/B/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/java/3/C/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/java/3/D/session")); + + TraceNode e3 = TraceNode() + .addChild(TraceNode().addChild("server/cpp/3/A (message)").addChild("server/cpp/3/A (reply)")) + .addChild(TraceNode().addChild("server/cpp/3/B (message)").addChild("server/cpp/3/B (reply)")) + .addChild(TraceNode().addChild("server/cpp/3/C (message)").addChild("server/cpp/3/C (reply)")) + .addChild(TraceNode().addChild("server/cpp/3/D (message)").addChild("server/cpp/3/D (reply)")) + .addChild(TraceNode().addChild("server/java/3/A (message)").addChild("server/java/3/A (reply)")) + .addChild(TraceNode().addChild("server/java/3/B (message)").addChild("server/java/3/B (reply)")) + .addChild(TraceNode().addChild("server/java/3/C (message)").addChild("server/java/3/C (reply)")) + .addChild(TraceNode().addChild("server/java/3/D (message)").addChild("server/java/3/D (reply)")).setStrict(false); + TraceNode e2 = TraceNode() + .addChild(TraceNode().addChild("server/cpp/2/A (message)").addChild(e3).addChild("server/cpp/2/A (reply)")) + .addChild(TraceNode().addChild("server/cpp/2/B (message)").addChild(e3).addChild("server/cpp/2/B (reply)")) + .addChild(TraceNode().addChild("server/java/2/A (message)").addChild(e3).addChild("server/java/2/A (reply)")) + .addChild(TraceNode().addChild("server/java/2/B (message)").addChild(e3).addChild("server/java/2/B (reply)")).setStrict(false); + TraceNode expect = TraceNode() + .addChild(TraceNode().addChild("server/cpp/1/A (message)").addChild(e2).addChild("server/cpp/1/A (reply)")) + .addChild(TraceNode().addChild("server/java/1/A (message)").addChild(e2).addChild("server/java/1/A (reply)")).setStrict(false); + expect.normalize(); + + Receptor src; + Reply::UP reply; + SourceSession::UP ss = mb.getMessageBus().createSourceSession(src, SourceSessionParams()); + for (int i = 0; i < 50; ++i) { + Message::UP msg(new SimpleMessage("test")); + msg->getTrace().setLevel(1); + ss->send(std::move(msg), "test"); + reply = src.getReply(10); + if (reply.get() != NULL) { + reply->getTrace().getRoot().normalize(); + // resending breaks the trace, so retry until it has expected form + if (!reply->hasErrors() && reply->getTrace().getRoot().encode() == expect.encode()) { + break; + } + } + std::cout << "Attempt " << i << " got errors, retrying in 1 second.." << std::endl; + FastOS_Thread::Sleep(1000); + } + + EXPECT_TRUE(!reply->hasErrors()); + EXPECT_EQUAL(reply->getTrace().getRoot().encode(), expect.encode()); + EXPECT_TRUE(system("sh ctl.sh stop all") == 0); + TEST_DONE(); +} |