summaryrefslogtreecommitdiffstats
path: root/messagebus_test/src/tests/speed
diff options
context:
space:
mode:
Diffstat (limited to 'messagebus_test/src/tests/speed')
-rw-r--r--messagebus_test/src/tests/speed/.gitignore15
-rw-r--r--messagebus_test/src/tests/speed/CMakeLists.txt17
-rw-r--r--messagebus_test/src/tests/speed/DESC4
-rw-r--r--messagebus_test/src/tests/speed/FILES8
-rw-r--r--messagebus_test/src/tests/speed/JavaClient.java137
-rw-r--r--messagebus_test/src/tests/speed/JavaServer.java54
-rw-r--r--messagebus_test/src/tests/speed/cpp-client.cpp146
-rw-r--r--messagebus_test/src/tests/speed/cpp-server.cpp77
-rwxr-xr-xmessagebus_test/src/tests/speed/ctl.sh4
-rw-r--r--messagebus_test/src/tests/speed/progdefs.sh3
-rw-r--r--messagebus_test/src/tests/speed/routing-template.cfg11
-rw-r--r--messagebus_test/src/tests/speed/speed.cpp51
-rw-r--r--messagebus_test/src/tests/speed/speed_test.sh9
13 files changed, 536 insertions, 0 deletions
diff --git a/messagebus_test/src/tests/speed/.gitignore b/messagebus_test/src/tests/speed/.gitignore
new file mode 100644
index 00000000000..326da75ebb6
--- /dev/null
+++ b/messagebus_test/src/tests/speed/.gitignore
@@ -0,0 +1,15 @@
+*.class
+.depend
+Makefile
+cpp-client
+cpp-server
+out.*
+pid.*
+routing.cfg
+slobrok.cfg
+speed_test
+cpp-client-speed
+cpp-server-speed
+messagebus_test_speed_test_app
+messagebus_test_cpp-client-speed_app
+messagebus_test_cpp-server-speed_app
diff --git a/messagebus_test/src/tests/speed/CMakeLists.txt b/messagebus_test/src/tests/speed/CMakeLists.txt
new file mode 100644
index 00000000000..8e1018ec07c
--- /dev/null
+++ b/messagebus_test/src/tests/speed/CMakeLists.txt
@@ -0,0 +1,17 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(messagebus_test_speed_test_app
+ SOURCES
+ speed.cpp
+ DEPENDS
+)
+vespa_add_test(NAME messagebus_test_speed_test_app COMMAND messagebus_test_speed_test_app BENCHMARK)
+vespa_add_executable(messagebus_test_cpp-server-speed_app
+ SOURCES
+ cpp-server.cpp
+ DEPENDS
+)
+vespa_add_executable(messagebus_test_cpp-client-speed_app
+ SOURCES
+ cpp-client.cpp
+ DEPENDS
+)
diff --git a/messagebus_test/src/tests/speed/DESC b/messagebus_test/src/tests/speed/DESC
new file mode 100644
index 00000000000..10734957438
--- /dev/null
+++ b/messagebus_test/src/tests/speed/DESC
@@ -0,0 +1,4 @@
+This is a simple test that gives a rough idea of the inherent overhead
+in messagebus. It sends simple messages back and forth with the
+simplest routing setup possible. This test also tests that messagebus
+works across Java and C++.
diff --git a/messagebus_test/src/tests/speed/FILES b/messagebus_test/src/tests/speed/FILES
new file mode 100644
index 00000000000..09f0a5ec1d3
--- /dev/null
+++ b/messagebus_test/src/tests/speed/FILES
@@ -0,0 +1,8 @@
+speed.cpp
+out.server.cpp
+out.server.java
+cpp-client.cpp
+cpp-server.cpp
+JavaClient.java
+JavaServer.java
+routing-template.cfg
diff --git a/messagebus_test/src/tests/speed/JavaClient.java b/messagebus_test/src/tests/speed/JavaClient.java
new file mode 100644
index 00000000000..b905ab07e91
--- /dev/null
+++ b/messagebus_test/src/tests/speed/JavaClient.java
@@ -0,0 +1,137 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.test.*;
+import com.yahoo.config.*;
+import com.yahoo.messagebus.routing.*;
+import com.yahoo.messagebus.network.*;
+import com.yahoo.messagebus.network.rpc.*;
+import java.util.Arrays;
+import java.util.logging.*;
+
+public class JavaClient implements ReplyHandler {
+
+ private static Logger log = Logger.getLogger(JavaClient.class.getName());
+
+ private static class Counts {
+ public int okCnt = 0;
+ public int failCnt = 0;
+ Counts() {}
+ Counts(int okCnt, int failCnt) {
+ this.okCnt = okCnt;
+ this.failCnt = failCnt;
+ }
+ }
+
+ private SourceSession session;
+ private Counts counts = new Counts();
+ private static long mySeq = 100000;
+
+ public JavaClient(RPCMessageBus mb) {
+ session = mb.getMessageBus().createSourceSession(this, new SourceSessionParams().setTimeout(30));
+ }
+
+ public synchronized Counts sample() {
+ return new Counts(counts.okCnt, counts.failCnt);
+ }
+
+ public void send() {
+ send(++mySeq);
+ }
+
+ public void send(long seq) {
+ session.send(new MyMessage(seq), "test");
+ }
+
+ public void handleReply(Reply reply) {
+ if ((reply.getProtocol() == SimpleProtocol.NAME)
+ && (reply.getType() == SimpleProtocol.REPLY)
+ && (((SimpleReply)reply).getValue().equals("OK")))
+ {
+ synchronized (this) {
+ counts.okCnt++;
+ }
+ } else {
+ synchronized (this) {
+ counts.failCnt++;
+ }
+ }
+ try {
+ send();
+ } catch (IllegalStateException ignore) {} // handle paranoia for shutdown source sessions
+ }
+
+ public void shutdown() {
+ session.destroy();
+ }
+
+ public static void main(String[] args) {
+ try {
+ RPCMessageBus mb = new RPCMessageBus(
+ new MessageBusParams()
+ .setRetryPolicy(new RetryTransientErrorsPolicy().setBaseDelay(0.1))
+ .addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams()
+ .setIdentity(new Identity("server/java"))
+ .setSlobrokConfigId("file:slobrok.cfg"),
+ "file:routing.cfg");
+ JavaClient client = new JavaClient(mb);
+
+ // let the system 'warm up'
+ Thread.sleep(5000);
+
+ // inject messages into the feedback loop
+ for (int i = 0; i < 1024; ++i) {
+ client.send(i);
+ }
+
+ // let the system 'warm up'
+ Thread.sleep(5000);
+
+ long start;
+ long stop;
+ Counts before;
+ Counts after;
+
+ start = System.currentTimeMillis();
+ before = client.sample();
+ Thread.sleep(10000); // Benchmark time
+ stop = System.currentTimeMillis();
+ after = client.sample();
+ stop -= start;
+ double time = (double)stop;
+ double msgCnt = (double)(after.okCnt - before.okCnt);
+ double throughput = (msgCnt / time) * 1000.0;
+ System.out.printf("JAVA-CLIENT: %g msg/s\n", throughput);
+ client.shutdown();
+ mb.destroy();
+ if (after.failCnt > before.failCnt) {
+ System.err.printf("JAVA-CLIENT: FAILED (%d -> %d)\n",
+ before.failCnt, after.failCnt);
+ System.exit(1);
+ }
+ } catch (Exception e) {
+ log.log(Level.SEVERE, "JAVA-CLIENT: Failed", e);
+ System.exit(1);
+ }
+ }
+
+ private static class MyMessage extends SimpleMessage {
+
+ final long seqId;
+
+ MyMessage(long seqId) {
+ super("message");
+ this.seqId = seqId;
+ }
+
+ @Override
+ public boolean hasSequenceId() {
+ return true;
+ }
+
+ @Override
+ public long getSequenceId() {
+ return seqId;
+ }
+ }
+}
diff --git a/messagebus_test/src/tests/speed/JavaServer.java b/messagebus_test/src/tests/speed/JavaServer.java
new file mode 100644
index 00000000000..afec6dcdba2
--- /dev/null
+++ b/messagebus_test/src/tests/speed/JavaServer.java
@@ -0,0 +1,54 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.test.*;
+import com.yahoo.config.*;
+import com.yahoo.messagebus.routing.*;
+import com.yahoo.messagebus.network.*;
+import com.yahoo.messagebus.network.rpc.*;
+import java.util.Arrays;
+import java.util.logging.*;
+
+public class JavaServer implements MessageHandler {
+
+ private static Logger log = Logger.getLogger(JavaServer.class.getName());
+
+ private DestinationSession session;
+
+ public JavaServer(RPCMessageBus mb) {
+ session = mb.getMessageBus().createDestinationSession("session", true, this);
+ }
+
+ public void handleMessage(Message msg) {
+ if ((msg.getProtocol() == SimpleProtocol.NAME)
+ && (msg.getType() == SimpleProtocol.MESSAGE)
+ && (((SimpleMessage)msg).getValue().equals("message")))
+ {
+ Reply reply = new SimpleReply("OK");
+ msg.swapState(reply);
+ session.reply(reply);
+ } else {
+ Reply reply = new SimpleReply("FAIL");
+ msg.swapState(reply);
+ session.reply(reply);
+ }
+ }
+
+ public static void main(String[] args) {
+ try {
+ RPCMessageBus mb = new RPCMessageBus(
+ Arrays.asList((Protocol)new SimpleProtocol()),
+ new RPCNetworkParams()
+ .setIdentity(new Identity("server/java"))
+ .setSlobrokConfigId("file:slobrok.cfg"),
+ "file:routing.cfg");
+ JavaServer server = new JavaServer(mb);
+ System.out.println("java server started");
+ while (true) {
+ Thread.sleep(1000);
+ }
+ } catch (Exception e) {
+ log.log(Level.SEVERE, "JAVA-SERVER: Failed", e);
+ System.exit(1);
+ }
+ }
+}
diff --git a/messagebus_test/src/tests/speed/cpp-client.cpp b/messagebus_test/src/tests/speed/cpp-client.cpp
new file mode 100644
index 00000000000..c0c9d621a20
--- /dev/null
+++ b/messagebus_test/src/tests/speed/cpp-client.cpp
@@ -0,0 +1,146 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP("cpp-client");
+#include <vespa/messagebus/messagebus.h>
+#include <vespa/messagebus/routing/retrytransienterrorspolicy.h>
+#include <vespa/messagebus/rpcmessagebus.h>
+#include <vespa/messagebus/sourcesession.h>
+#include <vespa/messagebus/sourcesessionparams.h>
+#include <vespa/messagebus/testlib/simplemessage.h>
+#include <vespa/messagebus/testlib/simpleprotocol.h>
+#include <vespa/messagebus/testlib/simplereply.h>
+#include <vespa/vespalib/util/sync.h>
+
+using namespace mbus;
+
+class Client : public IReplyHandler
+{
+private:
+ vespalib::Lock _lock;
+ uint32_t _okCnt;
+ uint32_t _failCnt;
+ SourceSession::UP _session;
+ static uint64_t _seq;
+public:
+ Client(MessageBus &bus, const SourceSessionParams &params);
+ ~Client();
+ void send();
+ void send(uint64_t seq);
+ void sample(uint32_t &okCnt, uint32_t &failCnt);
+ void handleReply(Reply::UP reply);
+};
+uint64_t Client::_seq = 100000;
+
+Client::Client(MessageBus &bus, const SourceSessionParams &params)
+ : _lock(),
+ _okCnt(0),
+ _failCnt(0),
+ _session(bus.createSourceSession(*this, params))
+{
+}
+
+Client::~Client()
+{
+ _session->close();
+}
+
+void
+Client::send() {
+ send(++_seq);
+}
+
+void
+Client::send(uint64_t seq) {
+ Message::UP msg(new SimpleMessage("message", true, seq));
+ _session->send(std::move(msg), "test");
+}
+
+void
+Client::sample(uint32_t &okCnt, uint32_t &failCnt) {
+ vespalib::LockGuard guard(_lock);
+ okCnt = _okCnt;
+ failCnt = _failCnt;
+}
+
+void
+Client::handleReply(Reply::UP reply) {
+ if ((reply->getProtocol() == SimpleProtocol::NAME)
+ && (reply->getType() == SimpleProtocol::REPLY)
+ && (static_cast<SimpleReply&>(*reply).getValue() == "OK"))
+ {
+ vespalib::LockGuard guard(_lock);
+ ++_okCnt;
+ } else {
+ fprintf(stderr, "BAD REPLY\n");
+ for (uint32_t i = 0; i < reply->getNumErrors(); ++i) {
+ fprintf(stderr, "ERR[%d]: code=%d, msg=%s\n", i,
+ reply->getError(i).getCode(),
+ reply->getError(i).getMessage().c_str());
+ }
+ vespalib::LockGuard guard(_lock);
+ ++_failCnt;
+ }
+ send();
+}
+
+class App : public FastOS_Application
+{
+public:
+ int Main();
+};
+
+int
+App::Main()
+{
+ RetryTransientErrorsPolicy::SP retryPolicy(new RetryTransientErrorsPolicy());
+ retryPolicy->setBaseDelay(0.1);
+ RPCMessageBus mb(MessageBusParams().setRetryPolicy(retryPolicy).addProtocol(IProtocol::SP(new SimpleProtocol())),
+ RPCNetworkParams().setIdentity(Identity("server/cpp")).setSlobrokConfig("file:slobrok.cfg"),
+ "file:routing.cfg");
+ Client client(mb.getMessageBus(), SourceSessionParams().setTimeout(30));
+
+ // let the system 'warm up'
+ FastOS_Thread::Sleep(5000);
+
+ // inject messages into the feedback loop
+ for (uint32_t i = 0; i < 1024; ++i) {
+ client.send(i);
+ }
+
+ // let the system 'warm up'
+ FastOS_Thread::Sleep(5000);
+
+ FastOS_Time start;
+ FastOS_Time stop;
+ uint32_t okBefore = 0;
+ uint32_t okAfter = 0;
+ uint32_t failBefore = 0;
+ uint32_t failAfter = 0;
+
+ start.SetNow();
+ client.sample(okBefore, failBefore);
+ FastOS_Thread::Sleep(10000); // Benchmark time
+ stop.SetNow();
+ client.sample(okAfter, failAfter);
+ stop -= start;
+ double time = stop.MilliSecs();
+ double msgCnt = (double)(okAfter - okBefore);
+ double throughput = (msgCnt / time) * 1000.0;
+ fprintf(stdout, "CPP-CLIENT: %g msg/s\n", throughput);
+ if (failAfter > failBefore) {
+ fprintf(stderr, "CPP-CLIENT: FAILED (%d -> %d)\n", failBefore, failAfter);
+ return 1;
+ }
+ return 0;
+}
+
+int main(int argc, char **argv) {
+ fprintf(stderr, "started '%s'\n", argv[0]);
+ fflush(stderr);
+ App app;
+ int r = app.Entry(argc, argv);
+ fprintf(stderr, "stopping '%s'\n", argv[0]);
+ fflush(stderr);
+ return r;
+}
diff --git a/messagebus_test/src/tests/speed/cpp-server.cpp b/messagebus_test/src/tests/speed/cpp-server.cpp
new file mode 100644
index 00000000000..c2cd9bf262a
--- /dev/null
+++ b/messagebus_test/src/tests/speed/cpp-server.cpp
@@ -0,0 +1,77 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP("cpp-server");
+#include <vespa/messagebus/messagebus.h>
+#include <vespa/messagebus/destinationsession.h>
+#include <vespa/messagebus/testlib/simplemessage.h>
+#include <vespa/messagebus/testlib/simplereply.h>
+#include <vespa/messagebus/testlib/simpleprotocol.h>
+#include <vespa/messagebus/rpcmessagebus.h>
+#include <vespa/messagebus/iprotocol.h>
+#include <vespa/messagebus/protocolset.h>
+
+using namespace mbus;
+
+class Server : public IMessageHandler
+{
+private:
+ DestinationSession::UP _session;
+public:
+ Server(MessageBus &bus);
+ ~Server();
+ void handleMessage(Message::UP msg);
+};
+
+Server::Server(MessageBus &bus)
+ : _session(bus.createDestinationSession("session", true, *this))
+{
+ fprintf(stderr, "cpp server started\n");
+}
+
+Server::~Server()
+{
+ _session.reset();
+}
+
+void
+Server::handleMessage(Message::UP msg) {
+ if ((msg->getProtocol() == SimpleProtocol::NAME)
+ && (msg->getType() == SimpleProtocol::MESSAGE)
+ && (static_cast<SimpleMessage&>(*msg).getValue() == "message"))
+ {
+ Reply::UP reply(new SimpleReply("OK"));
+ msg->swapState(*reply);
+ _session->reply(std::move(reply));
+ } else {
+ Reply::UP reply(new SimpleReply("FAIL"));
+ msg->swapState(*reply);
+ _session->reply(std::move(reply));
+ }
+}
+
+class App : public FastOS_Application
+{
+public:
+ int Main();
+};
+
+int
+App::Main()
+{
+ RPCMessageBus mb(ProtocolSet().add(IProtocol::SP(new SimpleProtocol())),
+ RPCNetworkParams()
+ .setIdentity(Identity("server/cpp"))
+ .setSlobrokConfig("file:slobrok.cfg"),
+ "file:routing.cfg");
+ Server server(mb.getMessageBus());
+ while (true) {
+ FastOS_Thread::Sleep(1000);
+ }
+ return 0;
+}
+
+int main(int argc, char **argv) {
+ App app;
+ return app.Entry(argc, argv);
+}
diff --git a/messagebus_test/src/tests/speed/ctl.sh b/messagebus_test/src/tests/speed/ctl.sh
new file mode 100755
index 00000000000..864be4290ed
--- /dev/null
+++ b/messagebus_test/src/tests/speed/ctl.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+exec ../../binref/progctl.sh progdefs.sh "$@"
diff --git a/messagebus_test/src/tests/speed/progdefs.sh b/messagebus_test/src/tests/speed/progdefs.sh
new file mode 100644
index 00000000000..4e0390142cf
--- /dev/null
+++ b/messagebus_test/src/tests/speed/progdefs.sh
@@ -0,0 +1,3 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+prog server cpp "" "./messagebus_test_cpp-server-speed_app"
+prog server java "" "../../binref/runjava JavaServer"
diff --git a/messagebus_test/src/tests/speed/routing-template.cfg b/messagebus_test/src/tests/speed/routing-template.cfg
new file mode 100644
index 00000000000..4b938c9cc82
--- /dev/null
+++ b/messagebus_test/src/tests/speed/routing-template.cfg
@@ -0,0 +1,11 @@
+routingtable[1]
+routingtable[0].protocol "Simple"
+routingtable[0].hop[1]
+routingtable[0].hop[0].name "server"
+routingtable[0].hop[0].selector "server/session"
+routingtable[0].hop[0].recipient[1]
+routingtable[0].hop[0].recipient[0] "server/session"
+routingtable[0].route[1]
+routingtable[0].route[0].name "test"
+routingtable[0].route[0].hop[1]
+routingtable[0].route[0].hop[0] "server"
diff --git a/messagebus_test/src/tests/speed/speed.cpp b/messagebus_test/src/tests/speed/speed.cpp
new file mode 100644
index 00000000000..31ea419ce5c
--- /dev/null
+++ b/messagebus_test/src/tests/speed/speed.cpp
@@ -0,0 +1,51 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP("speed_test");
+#include <vespa/vespalib/testkit/testapp.h>
+#include <vespa/messagebus/testlib/slobrok.h>
+#include <vespa/vespalib/util/stringfmt.h>
+
+using namespace mbus;
+using vespalib::make_string;
+
+TEST_SETUP(Test);
+
+int
+Test::Main()
+{
+ TEST_INIT("speed_test");
+ Slobrok slobrok;
+ { // Make slobrok config
+ EXPECT_EQUAL(system("echo slobrok[1] > slobrok.cfg"), 0);
+ EXPECT_EQUAL(system(make_string("echo 'slobrok[0].connectionspec tcp/localhost:%d' "
+ ">> slobrok.cfg", slobrok.port()).c_str()), 0);
+ }
+ { // CPP SERVER
+ { // Make routing config
+ EXPECT_EQUAL(system("cat routing-template.cfg | sed 's#session#cpp/session#' > routing.cfg"), 0);
+ }
+ fprintf(stderr, "STARTING CPP-SERVER\n");
+ EXPECT_EQUAL(system("sh ctl.sh start server cpp"), 0);
+ fprintf(stderr, "STARTING CPP-CLIENT\n");
+ EXPECT_EQUAL(system("./messagebus_test_cpp-client-speed_app"), 0);
+ fprintf(stderr, "STARTING JAVA-CLIENT\n");
+ EXPECT_EQUAL(system("../../binref/runjava JavaClient"), 0);
+ fprintf(stderr, "STOPPING\n");
+ EXPECT_EQUAL(system("sh ctl.sh stop server cpp"), 0);
+ }
+ { // JAVA SERVER
+ { // Make routing config
+ EXPECT_EQUAL(system("cat routing-template.cfg | sed 's#session#java/session#' > routing.cfg"), 0);
+ }
+ fprintf(stderr, "STARTING JAVA-SERVER\n");
+ EXPECT_EQUAL(system("sh ctl.sh start server java"), 0);
+ fprintf(stderr, "STARTING CPP-CLIENT\n");
+ EXPECT_EQUAL(system("./messagebus_test_cpp-client-speed_app"), 0);
+ fprintf(stderr, "STARTING JAVA-CLIENT\n");
+ EXPECT_EQUAL(system("../../binref/runjava JavaClient"), 0);
+ fprintf(stderr, "STOPPING\n");
+ EXPECT_EQUAL(system("sh ctl.sh stop server java"), 0);
+ }
+ TEST_DONE();
+}
diff --git a/messagebus_test/src/tests/speed/speed_test.sh b/messagebus_test/src/tests/speed/speed_test.sh
new file mode 100644
index 00000000000..77eb7e63e0c
--- /dev/null
+++ b/messagebus_test/src/tests/speed/speed_test.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+set -e
+
+. ../../binref/env.sh
+
+$BINREF/compilejava JavaServer.java
+$BINREF/compilejava JavaClient.java
+
+(ulimit -c; ulimit -H -c; ulimit -c unlimited; ./messagebus_test_speed_test_app)