// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include #include LOG_SETUP("context_test"); #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace mbus; struct Handler : public IMessageHandler { DestinationSession::UP session; Handler(MessageBus &mb) : session() { session = mb.createDestinationSession("session", true, *this); } ~Handler() { session.reset(); } virtual void handleMessage(Message::UP msg) { session->acknowledge(std::move(msg)); } }; RoutingSpec getRouting() { return RoutingSpec() .addTable(RoutingTableSpec("Simple") .addHop(HopSpec("test", "test/session")) .addRoute(RouteSpec("test").addHop("test"))); } TEST_SETUP(Test); int Test::Main() { TEST_INIT("context_test"); Slobrok slobrok; TestServer src(Identity(""), getRouting(), slobrok); TestServer dst(Identity("test"), getRouting(), slobrok); Handler handler(dst.mb); ASSERT_TRUE(src.waitSlobrok("test/session")); RoutableQueue queue; SourceSessionParams params; params.setThrottlePolicy(IThrottlePolicy::SP()); SourceSession::UP ss = src.mb.createSourceSession(queue, params); { Message::UP msg(new SimpleMessage("test", true, 1)); msg->setContext(Context((uint64_t)10)); ss->send(std::move(msg), "test"); } { Message::UP msg(new SimpleMessage("test", true, 1)); msg->setContext(Context((uint64_t)20)); ss->send(std::move(msg), "test"); } { Message::UP msg(new SimpleMessage("test", true, 1)); msg->setContext(Context((uint64_t)30)); ss->send(std::move(msg), "test"); } for (uint32_t i = 0; i < 1000; ++i) { if (queue.size() == 3) { break; } FastOS_Thread::Sleep(10); } EXPECT_EQUAL(queue.size(), 3u); { Reply::UP reply = Reply::UP((Reply*)queue.dequeue(0).release()); ASSERT_TRUE(reply.get() != 0); EXPECT_EQUAL(reply->getContext().value.UINT64, 10u); } { Reply::UP reply = Reply::UP((Reply*)queue.dequeue(0).release()); ASSERT_TRUE(reply.get() != 0); EXPECT_EQUAL(reply->getContext().value.UINT64, 20u); } { Reply::UP reply = Reply::UP((Reply*)queue.dequeue(0).release()); ASSERT_TRUE(reply.get() != 0); EXPECT_EQUAL(reply->getContext().value.UINT64, 30u); } TEST_DONE(); }