diff options
Diffstat (limited to 'messagebus_test/src/tests/speed')
-rw-r--r-- | messagebus_test/src/tests/speed/.gitignore | 15 | ||||
-rw-r--r-- | messagebus_test/src/tests/speed/CMakeLists.txt | 17 | ||||
-rw-r--r-- | messagebus_test/src/tests/speed/DESC | 4 | ||||
-rw-r--r-- | messagebus_test/src/tests/speed/FILES | 8 | ||||
-rw-r--r-- | messagebus_test/src/tests/speed/JavaClient.java | 137 | ||||
-rw-r--r-- | messagebus_test/src/tests/speed/JavaServer.java | 54 | ||||
-rw-r--r-- | messagebus_test/src/tests/speed/cpp-client.cpp | 146 | ||||
-rw-r--r-- | messagebus_test/src/tests/speed/cpp-server.cpp | 77 | ||||
-rwxr-xr-x | messagebus_test/src/tests/speed/ctl.sh | 4 | ||||
-rw-r--r-- | messagebus_test/src/tests/speed/progdefs.sh | 3 | ||||
-rw-r--r-- | messagebus_test/src/tests/speed/routing-template.cfg | 11 | ||||
-rw-r--r-- | messagebus_test/src/tests/speed/speed.cpp | 51 | ||||
-rw-r--r-- | messagebus_test/src/tests/speed/speed_test.sh | 9 |
13 files changed, 536 insertions, 0 deletions
diff --git a/messagebus_test/src/tests/speed/.gitignore b/messagebus_test/src/tests/speed/.gitignore new file mode 100644 index 00000000000..326da75ebb6 --- /dev/null +++ b/messagebus_test/src/tests/speed/.gitignore @@ -0,0 +1,15 @@ +*.class +.depend +Makefile +cpp-client +cpp-server +out.* +pid.* +routing.cfg +slobrok.cfg +speed_test +cpp-client-speed +cpp-server-speed +messagebus_test_speed_test_app +messagebus_test_cpp-client-speed_app +messagebus_test_cpp-server-speed_app diff --git a/messagebus_test/src/tests/speed/CMakeLists.txt b/messagebus_test/src/tests/speed/CMakeLists.txt new file mode 100644 index 00000000000..8e1018ec07c --- /dev/null +++ b/messagebus_test/src/tests/speed/CMakeLists.txt @@ -0,0 +1,17 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(messagebus_test_speed_test_app + SOURCES + speed.cpp + DEPENDS +) +vespa_add_test(NAME messagebus_test_speed_test_app COMMAND messagebus_test_speed_test_app BENCHMARK) +vespa_add_executable(messagebus_test_cpp-server-speed_app + SOURCES + cpp-server.cpp + DEPENDS +) +vespa_add_executable(messagebus_test_cpp-client-speed_app + SOURCES + cpp-client.cpp + DEPENDS +) diff --git a/messagebus_test/src/tests/speed/DESC b/messagebus_test/src/tests/speed/DESC new file mode 100644 index 00000000000..10734957438 --- /dev/null +++ b/messagebus_test/src/tests/speed/DESC @@ -0,0 +1,4 @@ +This is a simple test that gives a rough idea of the inherent overhead +in messagebus. It sends simple messages back and forth with the +simplest routing setup possible. This test also tests that messagebus +works across Java and C++. diff --git a/messagebus_test/src/tests/speed/FILES b/messagebus_test/src/tests/speed/FILES new file mode 100644 index 00000000000..09f0a5ec1d3 --- /dev/null +++ b/messagebus_test/src/tests/speed/FILES @@ -0,0 +1,8 @@ +speed.cpp +out.server.cpp +out.server.java +cpp-client.cpp +cpp-server.cpp +JavaClient.java +JavaServer.java +routing-template.cfg diff --git a/messagebus_test/src/tests/speed/JavaClient.java b/messagebus_test/src/tests/speed/JavaClient.java new file mode 100644 index 00000000000..b905ab07e91 --- /dev/null +++ b/messagebus_test/src/tests/speed/JavaClient.java @@ -0,0 +1,137 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.test.*; +import com.yahoo.config.*; +import com.yahoo.messagebus.routing.*; +import com.yahoo.messagebus.network.*; +import com.yahoo.messagebus.network.rpc.*; +import java.util.Arrays; +import java.util.logging.*; + +public class JavaClient implements ReplyHandler { + + private static Logger log = Logger.getLogger(JavaClient.class.getName()); + + private static class Counts { + public int okCnt = 0; + public int failCnt = 0; + Counts() {} + Counts(int okCnt, int failCnt) { + this.okCnt = okCnt; + this.failCnt = failCnt; + } + } + + private SourceSession session; + private Counts counts = new Counts(); + private static long mySeq = 100000; + + public JavaClient(RPCMessageBus mb) { + session = mb.getMessageBus().createSourceSession(this, new SourceSessionParams().setTimeout(30)); + } + + public synchronized Counts sample() { + return new Counts(counts.okCnt, counts.failCnt); + } + + public void send() { + send(++mySeq); + } + + public void send(long seq) { + session.send(new MyMessage(seq), "test"); + } + + public void handleReply(Reply reply) { + if ((reply.getProtocol() == SimpleProtocol.NAME) + && (reply.getType() == SimpleProtocol.REPLY) + && (((SimpleReply)reply).getValue().equals("OK"))) + { + synchronized (this) { + counts.okCnt++; + } + } else { + synchronized (this) { + counts.failCnt++; + } + } + try { + send(); + } catch (IllegalStateException ignore) {} // handle paranoia for shutdown source sessions + } + + public void shutdown() { + session.destroy(); + } + + public static void main(String[] args) { + try { + RPCMessageBus mb = new RPCMessageBus( + new MessageBusParams() + .setRetryPolicy(new RetryTransientErrorsPolicy().setBaseDelay(0.1)) + .addProtocol(new SimpleProtocol()), + new RPCNetworkParams() + .setIdentity(new Identity("server/java")) + .setSlobrokConfigId("file:slobrok.cfg"), + "file:routing.cfg"); + JavaClient client = new JavaClient(mb); + + // let the system 'warm up' + Thread.sleep(5000); + + // inject messages into the feedback loop + for (int i = 0; i < 1024; ++i) { + client.send(i); + } + + // let the system 'warm up' + Thread.sleep(5000); + + long start; + long stop; + Counts before; + Counts after; + + start = System.currentTimeMillis(); + before = client.sample(); + Thread.sleep(10000); // Benchmark time + stop = System.currentTimeMillis(); + after = client.sample(); + stop -= start; + double time = (double)stop; + double msgCnt = (double)(after.okCnt - before.okCnt); + double throughput = (msgCnt / time) * 1000.0; + System.out.printf("JAVA-CLIENT: %g msg/s\n", throughput); + client.shutdown(); + mb.destroy(); + if (after.failCnt > before.failCnt) { + System.err.printf("JAVA-CLIENT: FAILED (%d -> %d)\n", + before.failCnt, after.failCnt); + System.exit(1); + } + } catch (Exception e) { + log.log(Level.SEVERE, "JAVA-CLIENT: Failed", e); + System.exit(1); + } + } + + private static class MyMessage extends SimpleMessage { + + final long seqId; + + MyMessage(long seqId) { + super("message"); + this.seqId = seqId; + } + + @Override + public boolean hasSequenceId() { + return true; + } + + @Override + public long getSequenceId() { + return seqId; + } + } +} diff --git a/messagebus_test/src/tests/speed/JavaServer.java b/messagebus_test/src/tests/speed/JavaServer.java new file mode 100644 index 00000000000..afec6dcdba2 --- /dev/null +++ b/messagebus_test/src/tests/speed/JavaServer.java @@ -0,0 +1,54 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.test.*; +import com.yahoo.config.*; +import com.yahoo.messagebus.routing.*; +import com.yahoo.messagebus.network.*; +import com.yahoo.messagebus.network.rpc.*; +import java.util.Arrays; +import java.util.logging.*; + +public class JavaServer implements MessageHandler { + + private static Logger log = Logger.getLogger(JavaServer.class.getName()); + + private DestinationSession session; + + public JavaServer(RPCMessageBus mb) { + session = mb.getMessageBus().createDestinationSession("session", true, this); + } + + public void handleMessage(Message msg) { + if ((msg.getProtocol() == SimpleProtocol.NAME) + && (msg.getType() == SimpleProtocol.MESSAGE) + && (((SimpleMessage)msg).getValue().equals("message"))) + { + Reply reply = new SimpleReply("OK"); + msg.swapState(reply); + session.reply(reply); + } else { + Reply reply = new SimpleReply("FAIL"); + msg.swapState(reply); + session.reply(reply); + } + } + + public static void main(String[] args) { + try { + RPCMessageBus mb = new RPCMessageBus( + Arrays.asList((Protocol)new SimpleProtocol()), + new RPCNetworkParams() + .setIdentity(new Identity("server/java")) + .setSlobrokConfigId("file:slobrok.cfg"), + "file:routing.cfg"); + JavaServer server = new JavaServer(mb); + System.out.println("java server started"); + while (true) { + Thread.sleep(1000); + } + } catch (Exception e) { + log.log(Level.SEVERE, "JAVA-SERVER: Failed", e); + System.exit(1); + } + } +} diff --git a/messagebus_test/src/tests/speed/cpp-client.cpp b/messagebus_test/src/tests/speed/cpp-client.cpp new file mode 100644 index 00000000000..c0c9d621a20 --- /dev/null +++ b/messagebus_test/src/tests/speed/cpp-client.cpp @@ -0,0 +1,146 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("cpp-client"); +#include <vespa/messagebus/messagebus.h> +#include <vespa/messagebus/routing/retrytransienterrorspolicy.h> +#include <vespa/messagebus/rpcmessagebus.h> +#include <vespa/messagebus/sourcesession.h> +#include <vespa/messagebus/sourcesessionparams.h> +#include <vespa/messagebus/testlib/simplemessage.h> +#include <vespa/messagebus/testlib/simpleprotocol.h> +#include <vespa/messagebus/testlib/simplereply.h> +#include <vespa/vespalib/util/sync.h> + +using namespace mbus; + +class Client : public IReplyHandler +{ +private: + vespalib::Lock _lock; + uint32_t _okCnt; + uint32_t _failCnt; + SourceSession::UP _session; + static uint64_t _seq; +public: + Client(MessageBus &bus, const SourceSessionParams ¶ms); + ~Client(); + void send(); + void send(uint64_t seq); + void sample(uint32_t &okCnt, uint32_t &failCnt); + void handleReply(Reply::UP reply); +}; +uint64_t Client::_seq = 100000; + +Client::Client(MessageBus &bus, const SourceSessionParams ¶ms) + : _lock(), + _okCnt(0), + _failCnt(0), + _session(bus.createSourceSession(*this, params)) +{ +} + +Client::~Client() +{ + _session->close(); +} + +void +Client::send() { + send(++_seq); +} + +void +Client::send(uint64_t seq) { + Message::UP msg(new SimpleMessage("message", true, seq)); + _session->send(std::move(msg), "test"); +} + +void +Client::sample(uint32_t &okCnt, uint32_t &failCnt) { + vespalib::LockGuard guard(_lock); + okCnt = _okCnt; + failCnt = _failCnt; +} + +void +Client::handleReply(Reply::UP reply) { + if ((reply->getProtocol() == SimpleProtocol::NAME) + && (reply->getType() == SimpleProtocol::REPLY) + && (static_cast<SimpleReply&>(*reply).getValue() == "OK")) + { + vespalib::LockGuard guard(_lock); + ++_okCnt; + } else { + fprintf(stderr, "BAD REPLY\n"); + for (uint32_t i = 0; i < reply->getNumErrors(); ++i) { + fprintf(stderr, "ERR[%d]: code=%d, msg=%s\n", i, + reply->getError(i).getCode(), + reply->getError(i).getMessage().c_str()); + } + vespalib::LockGuard guard(_lock); + ++_failCnt; + } + send(); +} + +class App : public FastOS_Application +{ +public: + int Main(); +}; + +int +App::Main() +{ + RetryTransientErrorsPolicy::SP retryPolicy(new RetryTransientErrorsPolicy()); + retryPolicy->setBaseDelay(0.1); + RPCMessageBus mb(MessageBusParams().setRetryPolicy(retryPolicy).addProtocol(IProtocol::SP(new SimpleProtocol())), + RPCNetworkParams().setIdentity(Identity("server/cpp")).setSlobrokConfig("file:slobrok.cfg"), + "file:routing.cfg"); + Client client(mb.getMessageBus(), SourceSessionParams().setTimeout(30)); + + // let the system 'warm up' + FastOS_Thread::Sleep(5000); + + // inject messages into the feedback loop + for (uint32_t i = 0; i < 1024; ++i) { + client.send(i); + } + + // let the system 'warm up' + FastOS_Thread::Sleep(5000); + + FastOS_Time start; + FastOS_Time stop; + uint32_t okBefore = 0; + uint32_t okAfter = 0; + uint32_t failBefore = 0; + uint32_t failAfter = 0; + + start.SetNow(); + client.sample(okBefore, failBefore); + FastOS_Thread::Sleep(10000); // Benchmark time + stop.SetNow(); + client.sample(okAfter, failAfter); + stop -= start; + double time = stop.MilliSecs(); + double msgCnt = (double)(okAfter - okBefore); + double throughput = (msgCnt / time) * 1000.0; + fprintf(stdout, "CPP-CLIENT: %g msg/s\n", throughput); + if (failAfter > failBefore) { + fprintf(stderr, "CPP-CLIENT: FAILED (%d -> %d)\n", failBefore, failAfter); + return 1; + } + return 0; +} + +int main(int argc, char **argv) { + fprintf(stderr, "started '%s'\n", argv[0]); + fflush(stderr); + App app; + int r = app.Entry(argc, argv); + fprintf(stderr, "stopping '%s'\n", argv[0]); + fflush(stderr); + return r; +} diff --git a/messagebus_test/src/tests/speed/cpp-server.cpp b/messagebus_test/src/tests/speed/cpp-server.cpp new file mode 100644 index 00000000000..c2cd9bf262a --- /dev/null +++ b/messagebus_test/src/tests/speed/cpp-server.cpp @@ -0,0 +1,77 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("cpp-server"); +#include <vespa/messagebus/messagebus.h> +#include <vespa/messagebus/destinationsession.h> +#include <vespa/messagebus/testlib/simplemessage.h> +#include <vespa/messagebus/testlib/simplereply.h> +#include <vespa/messagebus/testlib/simpleprotocol.h> +#include <vespa/messagebus/rpcmessagebus.h> +#include <vespa/messagebus/iprotocol.h> +#include <vespa/messagebus/protocolset.h> + +using namespace mbus; + +class Server : public IMessageHandler +{ +private: + DestinationSession::UP _session; +public: + Server(MessageBus &bus); + ~Server(); + void handleMessage(Message::UP msg); +}; + +Server::Server(MessageBus &bus) + : _session(bus.createDestinationSession("session", true, *this)) +{ + fprintf(stderr, "cpp server started\n"); +} + +Server::~Server() +{ + _session.reset(); +} + +void +Server::handleMessage(Message::UP msg) { + if ((msg->getProtocol() == SimpleProtocol::NAME) + && (msg->getType() == SimpleProtocol::MESSAGE) + && (static_cast<SimpleMessage&>(*msg).getValue() == "message")) + { + Reply::UP reply(new SimpleReply("OK")); + msg->swapState(*reply); + _session->reply(std::move(reply)); + } else { + Reply::UP reply(new SimpleReply("FAIL")); + msg->swapState(*reply); + _session->reply(std::move(reply)); + } +} + +class App : public FastOS_Application +{ +public: + int Main(); +}; + +int +App::Main() +{ + RPCMessageBus mb(ProtocolSet().add(IProtocol::SP(new SimpleProtocol())), + RPCNetworkParams() + .setIdentity(Identity("server/cpp")) + .setSlobrokConfig("file:slobrok.cfg"), + "file:routing.cfg"); + Server server(mb.getMessageBus()); + while (true) { + FastOS_Thread::Sleep(1000); + } + return 0; +} + +int main(int argc, char **argv) { + App app; + return app.Entry(argc, argv); +} diff --git a/messagebus_test/src/tests/speed/ctl.sh b/messagebus_test/src/tests/speed/ctl.sh new file mode 100755 index 00000000000..864be4290ed --- /dev/null +++ b/messagebus_test/src/tests/speed/ctl.sh @@ -0,0 +1,4 @@ +#!/bin/bash +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +exec ../../binref/progctl.sh progdefs.sh "$@" diff --git a/messagebus_test/src/tests/speed/progdefs.sh b/messagebus_test/src/tests/speed/progdefs.sh new file mode 100644 index 00000000000..4e0390142cf --- /dev/null +++ b/messagebus_test/src/tests/speed/progdefs.sh @@ -0,0 +1,3 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +prog server cpp "" "./messagebus_test_cpp-server-speed_app" +prog server java "" "../../binref/runjava JavaServer" diff --git a/messagebus_test/src/tests/speed/routing-template.cfg b/messagebus_test/src/tests/speed/routing-template.cfg new file mode 100644 index 00000000000..4b938c9cc82 --- /dev/null +++ b/messagebus_test/src/tests/speed/routing-template.cfg @@ -0,0 +1,11 @@ +routingtable[1] +routingtable[0].protocol "Simple" +routingtable[0].hop[1] +routingtable[0].hop[0].name "server" +routingtable[0].hop[0].selector "server/session" +routingtable[0].hop[0].recipient[1] +routingtable[0].hop[0].recipient[0] "server/session" +routingtable[0].route[1] +routingtable[0].route[0].name "test" +routingtable[0].route[0].hop[1] +routingtable[0].route[0].hop[0] "server" diff --git a/messagebus_test/src/tests/speed/speed.cpp b/messagebus_test/src/tests/speed/speed.cpp new file mode 100644 index 00000000000..31ea419ce5c --- /dev/null +++ b/messagebus_test/src/tests/speed/speed.cpp @@ -0,0 +1,51 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("speed_test"); +#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/vespalib/util/stringfmt.h> + +using namespace mbus; +using vespalib::make_string; + +TEST_SETUP(Test); + +int +Test::Main() +{ + TEST_INIT("speed_test"); + Slobrok slobrok; + { // Make slobrok config + EXPECT_EQUAL(system("echo slobrok[1] > slobrok.cfg"), 0); + EXPECT_EQUAL(system(make_string("echo 'slobrok[0].connectionspec tcp/localhost:%d' " + ">> slobrok.cfg", slobrok.port()).c_str()), 0); + } + { // CPP SERVER + { // Make routing config + EXPECT_EQUAL(system("cat routing-template.cfg | sed 's#session#cpp/session#' > routing.cfg"), 0); + } + fprintf(stderr, "STARTING CPP-SERVER\n"); + EXPECT_EQUAL(system("sh ctl.sh start server cpp"), 0); + fprintf(stderr, "STARTING CPP-CLIENT\n"); + EXPECT_EQUAL(system("./messagebus_test_cpp-client-speed_app"), 0); + fprintf(stderr, "STARTING JAVA-CLIENT\n"); + EXPECT_EQUAL(system("../../binref/runjava JavaClient"), 0); + fprintf(stderr, "STOPPING\n"); + EXPECT_EQUAL(system("sh ctl.sh stop server cpp"), 0); + } + { // JAVA SERVER + { // Make routing config + EXPECT_EQUAL(system("cat routing-template.cfg | sed 's#session#java/session#' > routing.cfg"), 0); + } + fprintf(stderr, "STARTING JAVA-SERVER\n"); + EXPECT_EQUAL(system("sh ctl.sh start server java"), 0); + fprintf(stderr, "STARTING CPP-CLIENT\n"); + EXPECT_EQUAL(system("./messagebus_test_cpp-client-speed_app"), 0); + fprintf(stderr, "STARTING JAVA-CLIENT\n"); + EXPECT_EQUAL(system("../../binref/runjava JavaClient"), 0); + fprintf(stderr, "STOPPING\n"); + EXPECT_EQUAL(system("sh ctl.sh stop server java"), 0); + } + TEST_DONE(); +} diff --git a/messagebus_test/src/tests/speed/speed_test.sh b/messagebus_test/src/tests/speed/speed_test.sh new file mode 100644 index 00000000000..77eb7e63e0c --- /dev/null +++ b/messagebus_test/src/tests/speed/speed_test.sh @@ -0,0 +1,9 @@ +#!/bin/bash +set -e + +. ../../binref/env.sh + +$BINREF/compilejava JavaServer.java +$BINREF/compilejava JavaClient.java + +(ulimit -c; ulimit -H -c; ulimit -c unlimited; ./messagebus_test_speed_test_app) |