aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus_test/src/tests/speed/cpp-client.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'messagebus_test/src/tests/speed/cpp-client.cpp')
-rw-r--r--messagebus_test/src/tests/speed/cpp-client.cpp146
1 files changed, 146 insertions, 0 deletions
diff --git a/messagebus_test/src/tests/speed/cpp-client.cpp b/messagebus_test/src/tests/speed/cpp-client.cpp
new file mode 100644
index 00000000000..c0c9d621a20
--- /dev/null
+++ b/messagebus_test/src/tests/speed/cpp-client.cpp
@@ -0,0 +1,146 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP("cpp-client");
+#include <vespa/messagebus/messagebus.h>
+#include <vespa/messagebus/routing/retrytransienterrorspolicy.h>
+#include <vespa/messagebus/rpcmessagebus.h>
+#include <vespa/messagebus/sourcesession.h>
+#include <vespa/messagebus/sourcesessionparams.h>
+#include <vespa/messagebus/testlib/simplemessage.h>
+#include <vespa/messagebus/testlib/simpleprotocol.h>
+#include <vespa/messagebus/testlib/simplereply.h>
+#include <vespa/vespalib/util/sync.h>
+
+using namespace mbus;
+
+class Client : public IReplyHandler
+{
+private:
+ vespalib::Lock _lock;
+ uint32_t _okCnt;
+ uint32_t _failCnt;
+ SourceSession::UP _session;
+ static uint64_t _seq;
+public:
+ Client(MessageBus &bus, const SourceSessionParams &params);
+ ~Client();
+ void send();
+ void send(uint64_t seq);
+ void sample(uint32_t &okCnt, uint32_t &failCnt);
+ void handleReply(Reply::UP reply);
+};
+uint64_t Client::_seq = 100000;
+
+Client::Client(MessageBus &bus, const SourceSessionParams &params)
+ : _lock(),
+ _okCnt(0),
+ _failCnt(0),
+ _session(bus.createSourceSession(*this, params))
+{
+}
+
+Client::~Client()
+{
+ _session->close();
+}
+
+void
+Client::send() {
+ send(++_seq);
+}
+
+void
+Client::send(uint64_t seq) {
+ Message::UP msg(new SimpleMessage("message", true, seq));
+ _session->send(std::move(msg), "test");
+}
+
+void
+Client::sample(uint32_t &okCnt, uint32_t &failCnt) {
+ vespalib::LockGuard guard(_lock);
+ okCnt = _okCnt;
+ failCnt = _failCnt;
+}
+
+void
+Client::handleReply(Reply::UP reply) {
+ if ((reply->getProtocol() == SimpleProtocol::NAME)
+ && (reply->getType() == SimpleProtocol::REPLY)
+ && (static_cast<SimpleReply&>(*reply).getValue() == "OK"))
+ {
+ vespalib::LockGuard guard(_lock);
+ ++_okCnt;
+ } else {
+ fprintf(stderr, "BAD REPLY\n");
+ for (uint32_t i = 0; i < reply->getNumErrors(); ++i) {
+ fprintf(stderr, "ERR[%d]: code=%d, msg=%s\n", i,
+ reply->getError(i).getCode(),
+ reply->getError(i).getMessage().c_str());
+ }
+ vespalib::LockGuard guard(_lock);
+ ++_failCnt;
+ }
+ send();
+}
+
+class App : public FastOS_Application
+{
+public:
+ int Main();
+};
+
+int
+App::Main()
+{
+ RetryTransientErrorsPolicy::SP retryPolicy(new RetryTransientErrorsPolicy());
+ retryPolicy->setBaseDelay(0.1);
+ RPCMessageBus mb(MessageBusParams().setRetryPolicy(retryPolicy).addProtocol(IProtocol::SP(new SimpleProtocol())),
+ RPCNetworkParams().setIdentity(Identity("server/cpp")).setSlobrokConfig("file:slobrok.cfg"),
+ "file:routing.cfg");
+ Client client(mb.getMessageBus(), SourceSessionParams().setTimeout(30));
+
+ // let the system 'warm up'
+ FastOS_Thread::Sleep(5000);
+
+ // inject messages into the feedback loop
+ for (uint32_t i = 0; i < 1024; ++i) {
+ client.send(i);
+ }
+
+ // let the system 'warm up'
+ FastOS_Thread::Sleep(5000);
+
+ FastOS_Time start;
+ FastOS_Time stop;
+ uint32_t okBefore = 0;
+ uint32_t okAfter = 0;
+ uint32_t failBefore = 0;
+ uint32_t failAfter = 0;
+
+ start.SetNow();
+ client.sample(okBefore, failBefore);
+ FastOS_Thread::Sleep(10000); // Benchmark time
+ stop.SetNow();
+ client.sample(okAfter, failAfter);
+ stop -= start;
+ double time = stop.MilliSecs();
+ double msgCnt = (double)(okAfter - okBefore);
+ double throughput = (msgCnt / time) * 1000.0;
+ fprintf(stdout, "CPP-CLIENT: %g msg/s\n", throughput);
+ if (failAfter > failBefore) {
+ fprintf(stderr, "CPP-CLIENT: FAILED (%d -> %d)\n", failBefore, failAfter);
+ return 1;
+ }
+ return 0;
+}
+
+int main(int argc, char **argv) {
+ fprintf(stderr, "started '%s'\n", argv[0]);
+ fflush(stderr);
+ App app;
+ int r = app.Entry(argc, argv);
+ fprintf(stderr, "stopping '%s'\n", argv[0]);
+ fflush(stderr);
+ return r;
+}