summaryrefslogtreecommitdiffstats
path: root/messagebus_test/src/tests/trace
diff options
context:
space:
mode:
Diffstat (limited to 'messagebus_test/src/tests/trace')
-rw-r--r--messagebus_test/src/tests/trace/.gitignore12
-rw-r--r--messagebus_test/src/tests/trace/CMakeLists.txt13
-rw-r--r--messagebus_test/src/tests/trace/DESC1
-rw-r--r--messagebus_test/src/tests/trace/FILES19
-rw-r--r--messagebus_test/src/tests/trace/JavaServer.java97
-rw-r--r--messagebus_test/src/tests/trace/cpp-server.cpp90
-rwxr-xr-xmessagebus_test/src/tests/trace/ctl.sh4
-rw-r--r--messagebus_test/src/tests/trace/progdefs.sh15
-rw-r--r--messagebus_test/src/tests/trace/trace.cpp113
-rw-r--r--messagebus_test/src/tests/trace/trace_test.sh8
10 files changed, 372 insertions, 0 deletions
diff --git a/messagebus_test/src/tests/trace/.gitignore b/messagebus_test/src/tests/trace/.gitignore
new file mode 100644
index 00000000000..1be907ec2f8
--- /dev/null
+++ b/messagebus_test/src/tests/trace/.gitignore
@@ -0,0 +1,12 @@
+*.class
+.depend
+Makefile
+cpp-server
+out.*
+pid.*
+routing.cfg
+slobrok.cfg
+trace_test
+/cpp-server-trace
+messagebus_test_trace_test_app
+messagebus_test_cpp-server-trace_app
diff --git a/messagebus_test/src/tests/trace/CMakeLists.txt b/messagebus_test/src/tests/trace/CMakeLists.txt
new file mode 100644
index 00000000000..af539fa2818
--- /dev/null
+++ b/messagebus_test/src/tests/trace/CMakeLists.txt
@@ -0,0 +1,13 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(messagebus_test_trace_test_app TEST
+ SOURCES
+ trace.cpp
+ DEPENDS
+)
+vespa_add_executable(messagebus_test_cpp-server-trace_app
+ SOURCES
+ cpp-server.cpp
+ DEPENDS
+)
+vespa_add_test(NAME messagebus_test_trace_test_app NO_VALGRIND COMMAND sh trace_test.sh
+ DEPENDS messagebus_test_trace_test_app messagebus_test_cpp-server-trace_app)
diff --git a/messagebus_test/src/tests/trace/DESC b/messagebus_test/src/tests/trace/DESC
new file mode 100644
index 00000000000..452e75aefea
--- /dev/null
+++ b/messagebus_test/src/tests/trace/DESC
@@ -0,0 +1 @@
+trace test. Take a look at trace.cpp for details.
diff --git a/messagebus_test/src/tests/trace/FILES b/messagebus_test/src/tests/trace/FILES
new file mode 100644
index 00000000000..891e4df6273
--- /dev/null
+++ b/messagebus_test/src/tests/trace/FILES
@@ -0,0 +1,19 @@
+trace.cpp
+cpp-server.cpp
+JavaServer.java
+routing.cfg
+out.server.cpp1
+out.server.cpp2
+out.server.cpp3
+out.server.cpp4
+out.server.cpp5
+out.server.cpp6
+out.server.cpp7
+out.server.java1
+out.server.java2
+out.server.java3
+out.server.java4
+out.server.java5
+out.server.java6
+out.server.java7
+progdefs.sh
diff --git a/messagebus_test/src/tests/trace/JavaServer.java b/messagebus_test/src/tests/trace/JavaServer.java
new file mode 100644
index 00000000000..5dfe15e3d0b
--- /dev/null
+++ b/messagebus_test/src/tests/trace/JavaServer.java
@@ -0,0 +1,97 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.test.*;
+import com.yahoo.config.*;
+import com.yahoo.messagebus.routing.*;
+import com.yahoo.messagebus.network.*;
+import com.yahoo.messagebus.network.rpc.*;
+import java.util.Arrays;
+import java.util.logging.*;
+
+public class JavaServer implements MessageHandler, ReplyHandler {
+
+ private static Logger log = Logger.getLogger(JavaServer.class.getName());
+
+ private IntermediateSession session;
+ private String name;
+
+ public JavaServer(RPCMessageBus mb, String name) {
+ session = mb.getMessageBus().createIntermediateSession("session", true, this, this);
+ this.name = name;
+ }
+
+ public void handleMessage(Message msg) {
+ msg.getTrace().trace(1, name + " (message)", false);
+ if (msg.getRoute() == null || !msg.getRoute().hasHops()) {
+ System.out.println("**** Server '" + name + "' replying.");
+ Reply reply = new EmptyReply();
+ msg.swapState(reply);
+ handleReply(reply);
+ } else {
+ System.out.println("**** Server '" + name + "' forwarding message.");
+ session.forward(msg);
+ }
+ }
+
+ public void handleReply(Reply reply) {
+ reply.getTrace().trace(1, name + " (reply)", false);
+ session.forward(reply);
+ }
+
+ public static void main(String[] args) {
+ if (args.length != 1) {
+ System.err.println("usage: JavaServer <service prefix>");
+ System.exit(1);
+ }
+ String name = args[0];
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("All", new SimpleProtocol.PolicyFactory() {
+ @Override
+ public RoutingPolicy create(String param) {
+ return new AllPolicy();
+ }
+ });
+ try {
+ RPCMessageBus mb = new RPCMessageBus(
+ Arrays.<Protocol>asList(protocol),
+ new RPCNetworkParams()
+ .setIdentity(new Identity(name))
+ .setSlobrokConfigId("file:slobrok.cfg"),
+ "file:routing.cfg");
+ JavaServer server = new JavaServer(mb, name);
+ System.out.printf("java server started name=%s\n", name);
+ while (true) {
+ Thread.sleep(1000);
+ }
+ } catch (Exception e) {
+ log.log(Level.SEVERE, "JAVA-SERVER: Failed", e);
+ System.exit(1);
+ }
+ }
+
+ private static class AllPolicy implements RoutingPolicy {
+
+ @Override
+ public void select(RoutingContext ctx) {
+ ctx.addChildren(ctx.getMatchedRecipients());
+ }
+
+ @Override
+ public void merge(RoutingContext ctx) {
+ EmptyReply ret = new EmptyReply();
+ for (RoutingNodeIterator it = ctx.getChildIterator();
+ it.isValid(); it.next()) {
+ Reply reply = it.getReplyRef();
+ for (int i = 0; i < reply.getNumErrors(); ++i) {
+ ret.addError(reply.getError(i));
+ }
+ }
+ ctx.setReply(ret);
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+ }
+}
diff --git a/messagebus_test/src/tests/trace/cpp-server.cpp b/messagebus_test/src/tests/trace/cpp-server.cpp
new file mode 100644
index 00000000000..76e20bc3cfd
--- /dev/null
+++ b/messagebus_test/src/tests/trace/cpp-server.cpp
@@ -0,0 +1,90 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP("cpp-server");
+#include <vespa/messagebus/messagebus.h>
+#include <vespa/messagebus/destinationsession.h>
+#include <vespa/messagebus/testlib/simplemessage.h>
+#include <vespa/messagebus/testlib/simplereply.h>
+#include <vespa/messagebus/testlib/simpleprotocol.h>
+#include <vespa/messagebus/rpcmessagebus.h>
+#include <vespa/messagebus/iprotocol.h>
+#include <vespa/messagebus/protocolset.h>
+#include <vespa/messagebus/emptyreply.h>
+
+using namespace mbus;
+
+class Server : public IMessageHandler,
+ public IReplyHandler
+{
+private:
+ IntermediateSession::UP _session;
+ std::string _name;
+public:
+ Server(MessageBus &bus, const std::string &name);
+ ~Server();
+ void handleMessage(Message::UP msg);
+ void handleReply(Reply::UP reply);
+};
+
+Server::Server(MessageBus &bus, const std::string &name)
+ : _session(bus.createIntermediateSession("session", true, *this, *this)),
+ _name(name)
+{
+ fprintf(stderr, "cpp server started: %s\n", _name.c_str());
+}
+
+Server::~Server()
+{
+ _session.reset();
+}
+
+void
+Server::handleMessage(Message::UP msg) {
+ msg->getTrace().trace(1, _name + " (message)", false);
+ if (!msg->getRoute().hasHops()) {
+ fprintf(stderr, "**** Server '%s' replying.\n", _name.c_str());
+ Reply::UP reply(new EmptyReply());
+ msg->swapState(*reply);
+ handleReply(std::move(reply));
+ } else {
+ fprintf(stderr, "**** Server '%s' forwarding message.\n", _name.c_str());
+ _session->forward(std::move(msg));
+ }
+}
+
+void
+Server::handleReply(Reply::UP reply) {
+ reply->getTrace().trace(1, _name + " (reply)", false);
+ _session->forward(std::move(reply));
+}
+
+class App : public FastOS_Application
+{
+public:
+ int Main();
+};
+
+int
+App::Main()
+{
+ if (_argc != 2) {
+ fprintf(stderr, "usage: %s <service-prefix>\n", _argv[0]);
+ return 1;
+ }
+ RPCMessageBus mb(ProtocolSet().add(IProtocol::SP(new SimpleProtocol())),
+ RPCNetworkParams()
+ .setIdentity(Identity(_argv[1]))
+ .setSlobrokConfig("file:slobrok.cfg"),
+ "file:routing.cfg");
+ Server server(mb.getMessageBus(), _argv[1]);
+ while (true) {
+ FastOS_Thread::Sleep(1000);
+ }
+ return 0;
+}
+
+int main(int argc, char **argv) {
+ App app;
+ return app.Entry(argc, argv);
+}
diff --git a/messagebus_test/src/tests/trace/ctl.sh b/messagebus_test/src/tests/trace/ctl.sh
new file mode 100755
index 00000000000..864be4290ed
--- /dev/null
+++ b/messagebus_test/src/tests/trace/ctl.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+exec ../../binref/progctl.sh progdefs.sh "$@"
diff --git a/messagebus_test/src/tests/trace/progdefs.sh b/messagebus_test/src/tests/trace/progdefs.sh
new file mode 100644
index 00000000000..fd35b6503e2
--- /dev/null
+++ b/messagebus_test/src/tests/trace/progdefs.sh
@@ -0,0 +1,15 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+prog server cpp1 "" "./messagebus_test_cpp-server-trace_app server/cpp/1/A"
+prog server cpp2 "" "./messagebus_test_cpp-server-trace_app server/cpp/2/A"
+prog server cpp3 "" "./messagebus_test_cpp-server-trace_app server/cpp/2/B"
+prog server cpp4 "" "./messagebus_test_cpp-server-trace_app server/cpp/3/A"
+prog server cpp5 "" "./messagebus_test_cpp-server-trace_app server/cpp/3/B"
+prog server cpp6 "" "./messagebus_test_cpp-server-trace_app server/cpp/3/C"
+prog server cpp7 "" "./messagebus_test_cpp-server-trace_app server/cpp/3/D"
+prog server java1 "" "../../binref/runjava JavaServer server/java/1/A"
+prog server java2 "" "../../binref/runjava JavaServer server/java/2/A"
+prog server java3 "" "../../binref/runjava JavaServer server/java/2/B"
+prog server java4 "" "../../binref/runjava JavaServer server/java/3/A"
+prog server java5 "" "../../binref/runjava JavaServer server/java/3/B"
+prog server java6 "" "../../binref/runjava JavaServer server/java/3/C"
+prog server java7 "" "../../binref/runjava JavaServer server/java/3/D"
diff --git a/messagebus_test/src/tests/trace/trace.cpp b/messagebus_test/src/tests/trace/trace.cpp
new file mode 100644
index 00000000000..94550460c84
--- /dev/null
+++ b/messagebus_test/src/tests/trace/trace.cpp
@@ -0,0 +1,113 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP("trace_test");
+#include <vespa/vespalib/testkit/testapp.h>
+#include <vespa/messagebus/testlib/slobrok.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/messagebus/emptyreply.h>
+#include <vespa/messagebus/messagebus.h>
+#include <vespa/messagebus/sourcesession.h>
+#include <vespa/messagebus/rpcmessagebus.h>
+#include <vespa/messagebus/intermediatesession.h>
+#include <vespa/messagebus/destinationsession.h>
+#include <vespa/messagebus/testlib/slobrok.h>
+#include <vespa/messagebus/testlib/testserver.h>
+#include <vespa/messagebus/routing/routingspec.h>
+#include <vespa/messagebus/testlib/receptor.h>
+#include <vespa/messagebus/sourcesessionparams.h>
+#include <vespa/messagebus/testlib/simplemessage.h>
+#include <vespa/messagebus/testlib/simplereply.h>
+#include <vespa/messagebus/testlib/simpleprotocol.h>
+#include <iostream>
+
+using namespace mbus;
+using vespalib::make_string;
+
+TEST_SETUP(Test);
+
+bool
+waitSlobrok(RPCMessageBus &mbus, const std::string &pattern)
+{
+ for (int i = 0; i < 30000; i++) {
+ slobrok::api::MirrorAPI::SpecList res = mbus.getRPCNetwork().getMirror().lookup(pattern);
+ if (res.size() > 0) {
+ return true;
+ }
+ FastOS_Thread::Sleep(10);
+ }
+ return false;
+}
+
+int
+Test::Main()
+{
+ TEST_INIT("trace_test");
+ Slobrok slobrok;
+ { // Make slobrok config
+ EXPECT_TRUE(system("echo slobrok[1] > slobrok.cfg") == 0);
+ EXPECT_TRUE(system(make_string("echo 'slobrok[0].connectionspec tcp/localhost:%d' "
+ ">> slobrok.cfg", slobrok.port()).c_str()) == 0);
+ }
+ EXPECT_TRUE(system("sh ctl.sh start all") == 0);
+ RPCMessageBus mb(ProtocolSet().add(IProtocol::SP(new SimpleProtocol())),
+ RPCNetworkParams().setSlobrokConfig("file:slobrok.cfg"),
+ "file:routing.cfg");
+ EXPECT_TRUE(waitSlobrok(mb, "server/cpp/1/A/session"));
+ EXPECT_TRUE(waitSlobrok(mb, "server/cpp/2/A/session"));
+ EXPECT_TRUE(waitSlobrok(mb, "server/cpp/2/B/session"));
+ EXPECT_TRUE(waitSlobrok(mb, "server/cpp/3/A/session"));
+ EXPECT_TRUE(waitSlobrok(mb, "server/cpp/3/B/session"));
+ EXPECT_TRUE(waitSlobrok(mb, "server/cpp/3/C/session"));
+ EXPECT_TRUE(waitSlobrok(mb, "server/cpp/3/D/session"));
+ EXPECT_TRUE(waitSlobrok(mb, "server/java/1/A/session"));
+ EXPECT_TRUE(waitSlobrok(mb, "server/java/2/A/session"));
+ EXPECT_TRUE(waitSlobrok(mb, "server/java/2/B/session"));
+ EXPECT_TRUE(waitSlobrok(mb, "server/java/3/A/session"));
+ EXPECT_TRUE(waitSlobrok(mb, "server/java/3/B/session"));
+ EXPECT_TRUE(waitSlobrok(mb, "server/java/3/C/session"));
+ EXPECT_TRUE(waitSlobrok(mb, "server/java/3/D/session"));
+
+ TraceNode e3 = TraceNode()
+ .addChild(TraceNode().addChild("server/cpp/3/A (message)").addChild("server/cpp/3/A (reply)"))
+ .addChild(TraceNode().addChild("server/cpp/3/B (message)").addChild("server/cpp/3/B (reply)"))
+ .addChild(TraceNode().addChild("server/cpp/3/C (message)").addChild("server/cpp/3/C (reply)"))
+ .addChild(TraceNode().addChild("server/cpp/3/D (message)").addChild("server/cpp/3/D (reply)"))
+ .addChild(TraceNode().addChild("server/java/3/A (message)").addChild("server/java/3/A (reply)"))
+ .addChild(TraceNode().addChild("server/java/3/B (message)").addChild("server/java/3/B (reply)"))
+ .addChild(TraceNode().addChild("server/java/3/C (message)").addChild("server/java/3/C (reply)"))
+ .addChild(TraceNode().addChild("server/java/3/D (message)").addChild("server/java/3/D (reply)")).setStrict(false);
+ TraceNode e2 = TraceNode()
+ .addChild(TraceNode().addChild("server/cpp/2/A (message)").addChild(e3).addChild("server/cpp/2/A (reply)"))
+ .addChild(TraceNode().addChild("server/cpp/2/B (message)").addChild(e3).addChild("server/cpp/2/B (reply)"))
+ .addChild(TraceNode().addChild("server/java/2/A (message)").addChild(e3).addChild("server/java/2/A (reply)"))
+ .addChild(TraceNode().addChild("server/java/2/B (message)").addChild(e3).addChild("server/java/2/B (reply)")).setStrict(false);
+ TraceNode expect = TraceNode()
+ .addChild(TraceNode().addChild("server/cpp/1/A (message)").addChild(e2).addChild("server/cpp/1/A (reply)"))
+ .addChild(TraceNode().addChild("server/java/1/A (message)").addChild(e2).addChild("server/java/1/A (reply)")).setStrict(false);
+ expect.normalize();
+
+ Receptor src;
+ Reply::UP reply;
+ SourceSession::UP ss = mb.getMessageBus().createSourceSession(src, SourceSessionParams());
+ for (int i = 0; i < 50; ++i) {
+ Message::UP msg(new SimpleMessage("test"));
+ msg->getTrace().setLevel(1);
+ ss->send(std::move(msg), "test");
+ reply = src.getReply(10);
+ if (reply.get() != NULL) {
+ reply->getTrace().getRoot().normalize();
+ // resending breaks the trace, so retry until it has expected form
+ if (!reply->hasErrors() && reply->getTrace().getRoot().encode() == expect.encode()) {
+ break;
+ }
+ }
+ std::cout << "Attempt " << i << " got errors, retrying in 1 second.." << std::endl;
+ FastOS_Thread::Sleep(1000);
+ }
+
+ EXPECT_TRUE(!reply->hasErrors());
+ EXPECT_EQUAL(reply->getTrace().getRoot().encode(), expect.encode());
+ EXPECT_TRUE(system("sh ctl.sh stop all") == 0);
+ TEST_DONE();
+}
diff --git a/messagebus_test/src/tests/trace/trace_test.sh b/messagebus_test/src/tests/trace/trace_test.sh
new file mode 100644
index 00000000000..63005385332
--- /dev/null
+++ b/messagebus_test/src/tests/trace/trace_test.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+set -e
+
+. ../../binref/env.sh
+
+$BINREF/compilejava JavaServer.java
+
+./messagebus_test_trace_test_app