diff options
Diffstat (limited to 'messagebus_test/src/tests/trace')
-rw-r--r-- | messagebus_test/src/tests/trace/.gitignore | 12 | ||||
-rw-r--r-- | messagebus_test/src/tests/trace/CMakeLists.txt | 13 | ||||
-rw-r--r-- | messagebus_test/src/tests/trace/DESC | 1 | ||||
-rw-r--r-- | messagebus_test/src/tests/trace/FILES | 19 | ||||
-rw-r--r-- | messagebus_test/src/tests/trace/JavaServer.java | 97 | ||||
-rw-r--r-- | messagebus_test/src/tests/trace/cpp-server.cpp | 90 | ||||
-rwxr-xr-x | messagebus_test/src/tests/trace/ctl.sh | 4 | ||||
-rw-r--r-- | messagebus_test/src/tests/trace/progdefs.sh | 15 | ||||
-rw-r--r-- | messagebus_test/src/tests/trace/trace.cpp | 113 | ||||
-rw-r--r-- | messagebus_test/src/tests/trace/trace_test.sh | 8 |
10 files changed, 372 insertions, 0 deletions
diff --git a/messagebus_test/src/tests/trace/.gitignore b/messagebus_test/src/tests/trace/.gitignore new file mode 100644 index 00000000000..1be907ec2f8 --- /dev/null +++ b/messagebus_test/src/tests/trace/.gitignore @@ -0,0 +1,12 @@ +*.class +.depend +Makefile +cpp-server +out.* +pid.* +routing.cfg +slobrok.cfg +trace_test +/cpp-server-trace +messagebus_test_trace_test_app +messagebus_test_cpp-server-trace_app diff --git a/messagebus_test/src/tests/trace/CMakeLists.txt b/messagebus_test/src/tests/trace/CMakeLists.txt new file mode 100644 index 00000000000..af539fa2818 --- /dev/null +++ b/messagebus_test/src/tests/trace/CMakeLists.txt @@ -0,0 +1,13 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(messagebus_test_trace_test_app TEST + SOURCES + trace.cpp + DEPENDS +) +vespa_add_executable(messagebus_test_cpp-server-trace_app + SOURCES + cpp-server.cpp + DEPENDS +) +vespa_add_test(NAME messagebus_test_trace_test_app NO_VALGRIND COMMAND sh trace_test.sh + DEPENDS messagebus_test_trace_test_app messagebus_test_cpp-server-trace_app) diff --git a/messagebus_test/src/tests/trace/DESC b/messagebus_test/src/tests/trace/DESC new file mode 100644 index 00000000000..452e75aefea --- /dev/null +++ b/messagebus_test/src/tests/trace/DESC @@ -0,0 +1 @@ +trace test. Take a look at trace.cpp for details. diff --git a/messagebus_test/src/tests/trace/FILES b/messagebus_test/src/tests/trace/FILES new file mode 100644 index 00000000000..891e4df6273 --- /dev/null +++ b/messagebus_test/src/tests/trace/FILES @@ -0,0 +1,19 @@ +trace.cpp +cpp-server.cpp +JavaServer.java +routing.cfg +out.server.cpp1 +out.server.cpp2 +out.server.cpp3 +out.server.cpp4 +out.server.cpp5 +out.server.cpp6 +out.server.cpp7 +out.server.java1 +out.server.java2 +out.server.java3 +out.server.java4 +out.server.java5 +out.server.java6 +out.server.java7 +progdefs.sh diff --git a/messagebus_test/src/tests/trace/JavaServer.java b/messagebus_test/src/tests/trace/JavaServer.java new file mode 100644 index 00000000000..5dfe15e3d0b --- /dev/null +++ b/messagebus_test/src/tests/trace/JavaServer.java @@ -0,0 +1,97 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.test.*; +import com.yahoo.config.*; +import com.yahoo.messagebus.routing.*; +import com.yahoo.messagebus.network.*; +import com.yahoo.messagebus.network.rpc.*; +import java.util.Arrays; +import java.util.logging.*; + +public class JavaServer implements MessageHandler, ReplyHandler { + + private static Logger log = Logger.getLogger(JavaServer.class.getName()); + + private IntermediateSession session; + private String name; + + public JavaServer(RPCMessageBus mb, String name) { + session = mb.getMessageBus().createIntermediateSession("session", true, this, this); + this.name = name; + } + + public void handleMessage(Message msg) { + msg.getTrace().trace(1, name + " (message)", false); + if (msg.getRoute() == null || !msg.getRoute().hasHops()) { + System.out.println("**** Server '" + name + "' replying."); + Reply reply = new EmptyReply(); + msg.swapState(reply); + handleReply(reply); + } else { + System.out.println("**** Server '" + name + "' forwarding message."); + session.forward(msg); + } + } + + public void handleReply(Reply reply) { + reply.getTrace().trace(1, name + " (reply)", false); + session.forward(reply); + } + + public static void main(String[] args) { + if (args.length != 1) { + System.err.println("usage: JavaServer <service prefix>"); + System.exit(1); + } + String name = args[0]; + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("All", new SimpleProtocol.PolicyFactory() { + @Override + public RoutingPolicy create(String param) { + return new AllPolicy(); + } + }); + try { + RPCMessageBus mb = new RPCMessageBus( + Arrays.<Protocol>asList(protocol), + new RPCNetworkParams() + .setIdentity(new Identity(name)) + .setSlobrokConfigId("file:slobrok.cfg"), + "file:routing.cfg"); + JavaServer server = new JavaServer(mb, name); + System.out.printf("java server started name=%s\n", name); + while (true) { + Thread.sleep(1000); + } + } catch (Exception e) { + log.log(Level.SEVERE, "JAVA-SERVER: Failed", e); + System.exit(1); + } + } + + private static class AllPolicy implements RoutingPolicy { + + @Override + public void select(RoutingContext ctx) { + ctx.addChildren(ctx.getMatchedRecipients()); + } + + @Override + public void merge(RoutingContext ctx) { + EmptyReply ret = new EmptyReply(); + for (RoutingNodeIterator it = ctx.getChildIterator(); + it.isValid(); it.next()) { + Reply reply = it.getReplyRef(); + for (int i = 0; i < reply.getNumErrors(); ++i) { + ret.addError(reply.getError(i)); + } + } + ctx.setReply(ret); + } + + @Override + public void destroy() { + + } + } +} diff --git a/messagebus_test/src/tests/trace/cpp-server.cpp b/messagebus_test/src/tests/trace/cpp-server.cpp new file mode 100644 index 00000000000..76e20bc3cfd --- /dev/null +++ b/messagebus_test/src/tests/trace/cpp-server.cpp @@ -0,0 +1,90 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("cpp-server"); +#include <vespa/messagebus/messagebus.h> +#include <vespa/messagebus/destinationsession.h> +#include <vespa/messagebus/testlib/simplemessage.h> +#include <vespa/messagebus/testlib/simplereply.h> +#include <vespa/messagebus/testlib/simpleprotocol.h> +#include <vespa/messagebus/rpcmessagebus.h> +#include <vespa/messagebus/iprotocol.h> +#include <vespa/messagebus/protocolset.h> +#include <vespa/messagebus/emptyreply.h> + +using namespace mbus; + +class Server : public IMessageHandler, + public IReplyHandler +{ +private: + IntermediateSession::UP _session; + std::string _name; +public: + Server(MessageBus &bus, const std::string &name); + ~Server(); + void handleMessage(Message::UP msg); + void handleReply(Reply::UP reply); +}; + +Server::Server(MessageBus &bus, const std::string &name) + : _session(bus.createIntermediateSession("session", true, *this, *this)), + _name(name) +{ + fprintf(stderr, "cpp server started: %s\n", _name.c_str()); +} + +Server::~Server() +{ + _session.reset(); +} + +void +Server::handleMessage(Message::UP msg) { + msg->getTrace().trace(1, _name + " (message)", false); + if (!msg->getRoute().hasHops()) { + fprintf(stderr, "**** Server '%s' replying.\n", _name.c_str()); + Reply::UP reply(new EmptyReply()); + msg->swapState(*reply); + handleReply(std::move(reply)); + } else { + fprintf(stderr, "**** Server '%s' forwarding message.\n", _name.c_str()); + _session->forward(std::move(msg)); + } +} + +void +Server::handleReply(Reply::UP reply) { + reply->getTrace().trace(1, _name + " (reply)", false); + _session->forward(std::move(reply)); +} + +class App : public FastOS_Application +{ +public: + int Main(); +}; + +int +App::Main() +{ + if (_argc != 2) { + fprintf(stderr, "usage: %s <service-prefix>\n", _argv[0]); + return 1; + } + RPCMessageBus mb(ProtocolSet().add(IProtocol::SP(new SimpleProtocol())), + RPCNetworkParams() + .setIdentity(Identity(_argv[1])) + .setSlobrokConfig("file:slobrok.cfg"), + "file:routing.cfg"); + Server server(mb.getMessageBus(), _argv[1]); + while (true) { + FastOS_Thread::Sleep(1000); + } + return 0; +} + +int main(int argc, char **argv) { + App app; + return app.Entry(argc, argv); +} diff --git a/messagebus_test/src/tests/trace/ctl.sh b/messagebus_test/src/tests/trace/ctl.sh new file mode 100755 index 00000000000..864be4290ed --- /dev/null +++ b/messagebus_test/src/tests/trace/ctl.sh @@ -0,0 +1,4 @@ +#!/bin/bash +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +exec ../../binref/progctl.sh progdefs.sh "$@" diff --git a/messagebus_test/src/tests/trace/progdefs.sh b/messagebus_test/src/tests/trace/progdefs.sh new file mode 100644 index 00000000000..fd35b6503e2 --- /dev/null +++ b/messagebus_test/src/tests/trace/progdefs.sh @@ -0,0 +1,15 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +prog server cpp1 "" "./messagebus_test_cpp-server-trace_app server/cpp/1/A" +prog server cpp2 "" "./messagebus_test_cpp-server-trace_app server/cpp/2/A" +prog server cpp3 "" "./messagebus_test_cpp-server-trace_app server/cpp/2/B" +prog server cpp4 "" "./messagebus_test_cpp-server-trace_app server/cpp/3/A" +prog server cpp5 "" "./messagebus_test_cpp-server-trace_app server/cpp/3/B" +prog server cpp6 "" "./messagebus_test_cpp-server-trace_app server/cpp/3/C" +prog server cpp7 "" "./messagebus_test_cpp-server-trace_app server/cpp/3/D" +prog server java1 "" "../../binref/runjava JavaServer server/java/1/A" +prog server java2 "" "../../binref/runjava JavaServer server/java/2/A" +prog server java3 "" "../../binref/runjava JavaServer server/java/2/B" +prog server java4 "" "../../binref/runjava JavaServer server/java/3/A" +prog server java5 "" "../../binref/runjava JavaServer server/java/3/B" +prog server java6 "" "../../binref/runjava JavaServer server/java/3/C" +prog server java7 "" "../../binref/runjava JavaServer server/java/3/D" diff --git a/messagebus_test/src/tests/trace/trace.cpp b/messagebus_test/src/tests/trace/trace.cpp new file mode 100644 index 00000000000..94550460c84 --- /dev/null +++ b/messagebus_test/src/tests/trace/trace.cpp @@ -0,0 +1,113 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("trace_test"); +#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/messagebus/emptyreply.h> +#include <vespa/messagebus/messagebus.h> +#include <vespa/messagebus/sourcesession.h> +#include <vespa/messagebus/rpcmessagebus.h> +#include <vespa/messagebus/intermediatesession.h> +#include <vespa/messagebus/destinationsession.h> +#include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/messagebus/testlib/testserver.h> +#include <vespa/messagebus/routing/routingspec.h> +#include <vespa/messagebus/testlib/receptor.h> +#include <vespa/messagebus/sourcesessionparams.h> +#include <vespa/messagebus/testlib/simplemessage.h> +#include <vespa/messagebus/testlib/simplereply.h> +#include <vespa/messagebus/testlib/simpleprotocol.h> +#include <iostream> + +using namespace mbus; +using vespalib::make_string; + +TEST_SETUP(Test); + +bool +waitSlobrok(RPCMessageBus &mbus, const std::string &pattern) +{ + for (int i = 0; i < 30000; i++) { + slobrok::api::MirrorAPI::SpecList res = mbus.getRPCNetwork().getMirror().lookup(pattern); + if (res.size() > 0) { + return true; + } + FastOS_Thread::Sleep(10); + } + return false; +} + +int +Test::Main() +{ + TEST_INIT("trace_test"); + Slobrok slobrok; + { // Make slobrok config + EXPECT_TRUE(system("echo slobrok[1] > slobrok.cfg") == 0); + EXPECT_TRUE(system(make_string("echo 'slobrok[0].connectionspec tcp/localhost:%d' " + ">> slobrok.cfg", slobrok.port()).c_str()) == 0); + } + EXPECT_TRUE(system("sh ctl.sh start all") == 0); + RPCMessageBus mb(ProtocolSet().add(IProtocol::SP(new SimpleProtocol())), + RPCNetworkParams().setSlobrokConfig("file:slobrok.cfg"), + "file:routing.cfg"); + EXPECT_TRUE(waitSlobrok(mb, "server/cpp/1/A/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/cpp/2/A/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/cpp/2/B/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/cpp/3/A/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/cpp/3/B/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/cpp/3/C/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/cpp/3/D/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/java/1/A/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/java/2/A/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/java/2/B/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/java/3/A/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/java/3/B/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/java/3/C/session")); + EXPECT_TRUE(waitSlobrok(mb, "server/java/3/D/session")); + + TraceNode e3 = TraceNode() + .addChild(TraceNode().addChild("server/cpp/3/A (message)").addChild("server/cpp/3/A (reply)")) + .addChild(TraceNode().addChild("server/cpp/3/B (message)").addChild("server/cpp/3/B (reply)")) + .addChild(TraceNode().addChild("server/cpp/3/C (message)").addChild("server/cpp/3/C (reply)")) + .addChild(TraceNode().addChild("server/cpp/3/D (message)").addChild("server/cpp/3/D (reply)")) + .addChild(TraceNode().addChild("server/java/3/A (message)").addChild("server/java/3/A (reply)")) + .addChild(TraceNode().addChild("server/java/3/B (message)").addChild("server/java/3/B (reply)")) + .addChild(TraceNode().addChild("server/java/3/C (message)").addChild("server/java/3/C (reply)")) + .addChild(TraceNode().addChild("server/java/3/D (message)").addChild("server/java/3/D (reply)")).setStrict(false); + TraceNode e2 = TraceNode() + .addChild(TraceNode().addChild("server/cpp/2/A (message)").addChild(e3).addChild("server/cpp/2/A (reply)")) + .addChild(TraceNode().addChild("server/cpp/2/B (message)").addChild(e3).addChild("server/cpp/2/B (reply)")) + .addChild(TraceNode().addChild("server/java/2/A (message)").addChild(e3).addChild("server/java/2/A (reply)")) + .addChild(TraceNode().addChild("server/java/2/B (message)").addChild(e3).addChild("server/java/2/B (reply)")).setStrict(false); + TraceNode expect = TraceNode() + .addChild(TraceNode().addChild("server/cpp/1/A (message)").addChild(e2).addChild("server/cpp/1/A (reply)")) + .addChild(TraceNode().addChild("server/java/1/A (message)").addChild(e2).addChild("server/java/1/A (reply)")).setStrict(false); + expect.normalize(); + + Receptor src; + Reply::UP reply; + SourceSession::UP ss = mb.getMessageBus().createSourceSession(src, SourceSessionParams()); + for (int i = 0; i < 50; ++i) { + Message::UP msg(new SimpleMessage("test")); + msg->getTrace().setLevel(1); + ss->send(std::move(msg), "test"); + reply = src.getReply(10); + if (reply.get() != NULL) { + reply->getTrace().getRoot().normalize(); + // resending breaks the trace, so retry until it has expected form + if (!reply->hasErrors() && reply->getTrace().getRoot().encode() == expect.encode()) { + break; + } + } + std::cout << "Attempt " << i << " got errors, retrying in 1 second.." << std::endl; + FastOS_Thread::Sleep(1000); + } + + EXPECT_TRUE(!reply->hasErrors()); + EXPECT_EQUAL(reply->getTrace().getRoot().encode(), expect.encode()); + EXPECT_TRUE(system("sh ctl.sh stop all") == 0); + TEST_DONE(); +} diff --git a/messagebus_test/src/tests/trace/trace_test.sh b/messagebus_test/src/tests/trace/trace_test.sh new file mode 100644 index 00000000000..63005385332 --- /dev/null +++ b/messagebus_test/src/tests/trace/trace_test.sh @@ -0,0 +1,8 @@ +#!/bin/bash +set -e + +. ../../binref/env.sh + +$BINREF/compilejava JavaServer.java + +./messagebus_test_trace_test_app |