From 4b194fbcf79d5af1731d960ce4abbc2574ba397d Mon Sep 17 00:00:00 2001 From: Arnstein Ressem Date: Wed, 14 Sep 2016 08:45:26 +0200 Subject: Revert "Revert "Aressem/move messagebus test tests to separate module"" --- messagebus_test/src/tests/trace/cpp-server.cpp | 90 ++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 messagebus_test/src/tests/trace/cpp-server.cpp (limited to 'messagebus_test/src/tests/trace/cpp-server.cpp') diff --git a/messagebus_test/src/tests/trace/cpp-server.cpp b/messagebus_test/src/tests/trace/cpp-server.cpp new file mode 100644 index 00000000000..76e20bc3cfd --- /dev/null +++ b/messagebus_test/src/tests/trace/cpp-server.cpp @@ -0,0 +1,90 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include +#include +LOG_SETUP("cpp-server"); +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace mbus; + +class Server : public IMessageHandler, + public IReplyHandler +{ +private: + IntermediateSession::UP _session; + std::string _name; +public: + Server(MessageBus &bus, const std::string &name); + ~Server(); + void handleMessage(Message::UP msg); + void handleReply(Reply::UP reply); +}; + +Server::Server(MessageBus &bus, const std::string &name) + : _session(bus.createIntermediateSession("session", true, *this, *this)), + _name(name) +{ + fprintf(stderr, "cpp server started: %s\n", _name.c_str()); +} + +Server::~Server() +{ + _session.reset(); +} + +void +Server::handleMessage(Message::UP msg) { + msg->getTrace().trace(1, _name + " (message)", false); + if (!msg->getRoute().hasHops()) { + fprintf(stderr, "**** Server '%s' replying.\n", _name.c_str()); + Reply::UP reply(new EmptyReply()); + msg->swapState(*reply); + handleReply(std::move(reply)); + } else { + fprintf(stderr, "**** Server '%s' forwarding message.\n", _name.c_str()); + _session->forward(std::move(msg)); + } +} + +void +Server::handleReply(Reply::UP reply) { + reply->getTrace().trace(1, _name + " (reply)", false); + _session->forward(std::move(reply)); +} + +class App : public FastOS_Application +{ +public: + int Main(); +}; + +int +App::Main() +{ + if (_argc != 2) { + fprintf(stderr, "usage: %s \n", _argv[0]); + return 1; + } + RPCMessageBus mb(ProtocolSet().add(IProtocol::SP(new SimpleProtocol())), + RPCNetworkParams() + .setIdentity(Identity(_argv[1])) + .setSlobrokConfig("file:slobrok.cfg"), + "file:routing.cfg"); + Server server(mb.getMessageBus(), _argv[1]); + while (true) { + FastOS_Thread::Sleep(1000); + } + return 0; +} + +int main(int argc, char **argv) { + App app; + return app.Entry(argc, argv); +} -- cgit v1.2.3