diff options
Diffstat (limited to 'messagebus_test/src/tests/speed/cpp-client.cpp')
-rw-r--r-- | messagebus_test/src/tests/speed/cpp-client.cpp | 146 |
1 files changed, 146 insertions, 0 deletions
diff --git a/messagebus_test/src/tests/speed/cpp-client.cpp b/messagebus_test/src/tests/speed/cpp-client.cpp new file mode 100644 index 00000000000..c0c9d621a20 --- /dev/null +++ b/messagebus_test/src/tests/speed/cpp-client.cpp @@ -0,0 +1,146 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("cpp-client"); +#include <vespa/messagebus/messagebus.h> +#include <vespa/messagebus/routing/retrytransienterrorspolicy.h> +#include <vespa/messagebus/rpcmessagebus.h> +#include <vespa/messagebus/sourcesession.h> +#include <vespa/messagebus/sourcesessionparams.h> +#include <vespa/messagebus/testlib/simplemessage.h> +#include <vespa/messagebus/testlib/simpleprotocol.h> +#include <vespa/messagebus/testlib/simplereply.h> +#include <vespa/vespalib/util/sync.h> + +using namespace mbus; + +class Client : public IReplyHandler +{ +private: + vespalib::Lock _lock; + uint32_t _okCnt; + uint32_t _failCnt; + SourceSession::UP _session; + static uint64_t _seq; +public: + Client(MessageBus &bus, const SourceSessionParams ¶ms); + ~Client(); + void send(); + void send(uint64_t seq); + void sample(uint32_t &okCnt, uint32_t &failCnt); + void handleReply(Reply::UP reply); +}; +uint64_t Client::_seq = 100000; + +Client::Client(MessageBus &bus, const SourceSessionParams ¶ms) + : _lock(), + _okCnt(0), + _failCnt(0), + _session(bus.createSourceSession(*this, params)) +{ +} + +Client::~Client() +{ + _session->close(); +} + +void +Client::send() { + send(++_seq); +} + +void +Client::send(uint64_t seq) { + Message::UP msg(new SimpleMessage("message", true, seq)); + _session->send(std::move(msg), "test"); +} + +void +Client::sample(uint32_t &okCnt, uint32_t &failCnt) { + vespalib::LockGuard guard(_lock); + okCnt = _okCnt; + failCnt = _failCnt; +} + +void +Client::handleReply(Reply::UP reply) { + if ((reply->getProtocol() == SimpleProtocol::NAME) + && (reply->getType() == SimpleProtocol::REPLY) + && (static_cast<SimpleReply&>(*reply).getValue() == "OK")) + { + vespalib::LockGuard guard(_lock); + ++_okCnt; + } else { + fprintf(stderr, "BAD REPLY\n"); + for (uint32_t i = 0; i < reply->getNumErrors(); ++i) { + fprintf(stderr, "ERR[%d]: code=%d, msg=%s\n", i, + reply->getError(i).getCode(), + reply->getError(i).getMessage().c_str()); + } + vespalib::LockGuard guard(_lock); + ++_failCnt; + } + send(); +} + +class App : public FastOS_Application +{ +public: + int Main(); +}; + +int +App::Main() +{ + RetryTransientErrorsPolicy::SP retryPolicy(new RetryTransientErrorsPolicy()); + retryPolicy->setBaseDelay(0.1); + RPCMessageBus mb(MessageBusParams().setRetryPolicy(retryPolicy).addProtocol(IProtocol::SP(new SimpleProtocol())), + RPCNetworkParams().setIdentity(Identity("server/cpp")).setSlobrokConfig("file:slobrok.cfg"), + "file:routing.cfg"); + Client client(mb.getMessageBus(), SourceSessionParams().setTimeout(30)); + + // let the system 'warm up' + FastOS_Thread::Sleep(5000); + + // inject messages into the feedback loop + for (uint32_t i = 0; i < 1024; ++i) { + client.send(i); + } + + // let the system 'warm up' + FastOS_Thread::Sleep(5000); + + FastOS_Time start; + FastOS_Time stop; + uint32_t okBefore = 0; + uint32_t okAfter = 0; + uint32_t failBefore = 0; + uint32_t failAfter = 0; + + start.SetNow(); + client.sample(okBefore, failBefore); + FastOS_Thread::Sleep(10000); // Benchmark time + stop.SetNow(); + client.sample(okAfter, failAfter); + stop -= start; + double time = stop.MilliSecs(); + double msgCnt = (double)(okAfter - okBefore); + double throughput = (msgCnt / time) * 1000.0; + fprintf(stdout, "CPP-CLIENT: %g msg/s\n", throughput); + if (failAfter > failBefore) { + fprintf(stderr, "CPP-CLIENT: FAILED (%d -> %d)\n", failBefore, failAfter); + return 1; + } + return 0; +} + +int main(int argc, char **argv) { + fprintf(stderr, "started '%s'\n", argv[0]); + fflush(stderr); + App app; + int r = app.Entry(argc, argv); + fprintf(stderr, "stopping '%s'\n", argv[0]); + fflush(stderr); + return r; +} |