summaryrefslogtreecommitdiffstats
path: root/messagebus/src/tests/context/context.cpp
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /messagebus/src/tests/context/context.cpp
Publish
Diffstat (limited to 'messagebus/src/tests/context/context.cpp')
-rw-r--r--messagebus/src/tests/context/context.cpp102
1 files changed, 102 insertions, 0 deletions
diff --git a/messagebus/src/tests/context/context.cpp b/messagebus/src/tests/context/context.cpp
new file mode 100644
index 00000000000..ccb136e7b81
--- /dev/null
+++ b/messagebus/src/tests/context/context.cpp
@@ -0,0 +1,102 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP("context_test");
+
+#include <vespa/messagebus/destinationsession.h>
+#include <vespa/messagebus/intermediatesession.h>
+#include <vespa/messagebus/messagebus.h>
+#include <vespa/messagebus/routablequeue.h>
+#include <vespa/messagebus/routing/routingspec.h>
+#include <vespa/messagebus/sourcesession.h>
+#include <vespa/messagebus/sourcesessionparams.h>
+#include <vespa/messagebus/testlib/receptor.h>
+#include <vespa/messagebus/testlib/slobrok.h>
+#include <vespa/messagebus/testlib/simplemessage.h>
+#include <vespa/messagebus/testlib/simplereply.h>
+#include <vespa/messagebus/testlib/simpleprotocol.h>
+#include <vespa/messagebus/testlib/testserver.h>
+#include <vespa/vespalib/testkit/testapp.h>
+
+using namespace mbus;
+
+struct Handler : public IMessageHandler
+{
+ DestinationSession::UP session;
+
+ Handler(MessageBus &mb) : session() {
+ session = mb.createDestinationSession("session", true, *this);
+ }
+ ~Handler() {
+ session.reset();
+ }
+ virtual void handleMessage(Message::UP msg) {
+ session->acknowledge(std::move(msg));
+ }
+};
+
+RoutingSpec getRouting() {
+ return RoutingSpec()
+ .addTable(RoutingTableSpec("Simple")
+ .addHop(HopSpec("test", "test/session"))
+ .addRoute(RouteSpec("test").addHop("test")));
+}
+
+TEST_SETUP(Test);
+
+int
+Test::Main()
+{
+ TEST_INIT("context_test");
+
+ Slobrok slobrok;
+ TestServer src(Identity(""), getRouting(), slobrok);
+ TestServer dst(Identity("test"), getRouting(), slobrok);
+ Handler handler(dst.mb);
+
+ ASSERT_TRUE(src.waitSlobrok("test/session"));
+
+ RoutableQueue queue;
+ SourceSessionParams params;
+ params.setThrottlePolicy(IThrottlePolicy::SP());
+ SourceSession::UP ss = src.mb.createSourceSession(queue, params);
+
+ {
+ Message::UP msg(new SimpleMessage("test", true, 1));
+ msg->setContext(Context((uint64_t)10));
+ ss->send(std::move(msg), "test");
+ }
+ {
+ Message::UP msg(new SimpleMessage("test", true, 1));
+ msg->setContext(Context((uint64_t)20));
+ ss->send(std::move(msg), "test");
+ }
+ {
+ Message::UP msg(new SimpleMessage("test", true, 1));
+ msg->setContext(Context((uint64_t)30));
+ ss->send(std::move(msg), "test");
+ }
+ for (uint32_t i = 0; i < 1000; ++i) {
+ if (queue.size() == 3) {
+ break;
+ }
+ FastOS_Thread::Sleep(10);
+ }
+ EXPECT_EQUAL(queue.size(), 3u);
+ {
+ Reply::UP reply = Reply::UP((Reply*)queue.dequeue(0).release());
+ ASSERT_TRUE(reply.get() != 0);
+ EXPECT_EQUAL(reply->getContext().value.UINT64, 10u);
+ }
+ {
+ Reply::UP reply = Reply::UP((Reply*)queue.dequeue(0).release());
+ ASSERT_TRUE(reply.get() != 0);
+ EXPECT_EQUAL(reply->getContext().value.UINT64, 20u);
+ }
+ {
+ Reply::UP reply = Reply::UP((Reply*)queue.dequeue(0).release());
+ ASSERT_TRUE(reply.get() != 0);
+ EXPECT_EQUAL(reply->getContext().value.UINT64, 30u);
+ }
+ TEST_DONE();
+}