diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /messagebus/src/tests/context/context.cpp |
Publish
Diffstat (limited to 'messagebus/src/tests/context/context.cpp')
-rw-r--r-- | messagebus/src/tests/context/context.cpp | 102 |
1 files changed, 102 insertions, 0 deletions
diff --git a/messagebus/src/tests/context/context.cpp b/messagebus/src/tests/context/context.cpp new file mode 100644 index 00000000000..ccb136e7b81 --- /dev/null +++ b/messagebus/src/tests/context/context.cpp @@ -0,0 +1,102 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("context_test"); + +#include <vespa/messagebus/destinationsession.h> +#include <vespa/messagebus/intermediatesession.h> +#include <vespa/messagebus/messagebus.h> +#include <vespa/messagebus/routablequeue.h> +#include <vespa/messagebus/routing/routingspec.h> +#include <vespa/messagebus/sourcesession.h> +#include <vespa/messagebus/sourcesessionparams.h> +#include <vespa/messagebus/testlib/receptor.h> +#include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/messagebus/testlib/simplemessage.h> +#include <vespa/messagebus/testlib/simplereply.h> +#include <vespa/messagebus/testlib/simpleprotocol.h> +#include <vespa/messagebus/testlib/testserver.h> +#include <vespa/vespalib/testkit/testapp.h> + +using namespace mbus; + +struct Handler : public IMessageHandler +{ + DestinationSession::UP session; + + Handler(MessageBus &mb) : session() { + session = mb.createDestinationSession("session", true, *this); + } + ~Handler() { + session.reset(); + } + virtual void handleMessage(Message::UP msg) { + session->acknowledge(std::move(msg)); + } +}; + +RoutingSpec getRouting() { + return RoutingSpec() + .addTable(RoutingTableSpec("Simple") + .addHop(HopSpec("test", "test/session")) + .addRoute(RouteSpec("test").addHop("test"))); +} + +TEST_SETUP(Test); + +int +Test::Main() +{ + TEST_INIT("context_test"); + + Slobrok slobrok; + TestServer src(Identity(""), getRouting(), slobrok); + TestServer dst(Identity("test"), getRouting(), slobrok); + Handler handler(dst.mb); + + ASSERT_TRUE(src.waitSlobrok("test/session")); + + RoutableQueue queue; + SourceSessionParams params; + params.setThrottlePolicy(IThrottlePolicy::SP()); + SourceSession::UP ss = src.mb.createSourceSession(queue, params); + + { + Message::UP msg(new SimpleMessage("test", true, 1)); + msg->setContext(Context((uint64_t)10)); + ss->send(std::move(msg), "test"); + } + { + Message::UP msg(new SimpleMessage("test", true, 1)); + msg->setContext(Context((uint64_t)20)); + ss->send(std::move(msg), "test"); + } + { + Message::UP msg(new SimpleMessage("test", true, 1)); + msg->setContext(Context((uint64_t)30)); + ss->send(std::move(msg), "test"); + } + for (uint32_t i = 0; i < 1000; ++i) { + if (queue.size() == 3) { + break; + } + FastOS_Thread::Sleep(10); + } + EXPECT_EQUAL(queue.size(), 3u); + { + Reply::UP reply = Reply::UP((Reply*)queue.dequeue(0).release()); + ASSERT_TRUE(reply.get() != 0); + EXPECT_EQUAL(reply->getContext().value.UINT64, 10u); + } + { + Reply::UP reply = Reply::UP((Reply*)queue.dequeue(0).release()); + ASSERT_TRUE(reply.get() != 0); + EXPECT_EQUAL(reply->getContext().value.UINT64, 20u); + } + { + Reply::UP reply = Reply::UP((Reply*)queue.dequeue(0).release()); + ASSERT_TRUE(reply.get() != 0); + EXPECT_EQUAL(reply->getContext().value.UINT64, 30u); + } + TEST_DONE(); +} |