aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/tests/messageordering
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /messagebus/src/tests/messageordering
Publish
Diffstat (limited to 'messagebus/src/tests/messageordering')
-rw-r--r--messagebus/src/tests/messageordering/.gitignore3
-rw-r--r--messagebus/src/tests/messageordering/CMakeLists.txt12
-rw-r--r--messagebus/src/tests/messageordering/DESC1
-rw-r--r--messagebus/src/tests/messageordering/FILES1
-rw-r--r--messagebus/src/tests/messageordering/messageordering.cpp178
5 files changed, 195 insertions, 0 deletions
diff --git a/messagebus/src/tests/messageordering/.gitignore b/messagebus/src/tests/messageordering/.gitignore
new file mode 100644
index 00000000000..1e4e97670de
--- /dev/null
+++ b/messagebus/src/tests/messageordering/.gitignore
@@ -0,0 +1,3 @@
+/.depend
+/Makefile
+messagebus_messageordering_test_app
diff --git a/messagebus/src/tests/messageordering/CMakeLists.txt b/messagebus/src/tests/messageordering/CMakeLists.txt
new file mode 100644
index 00000000000..b3af6386684
--- /dev/null
+++ b/messagebus/src/tests/messageordering/CMakeLists.txt
@@ -0,0 +1,12 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(messagebus_messageordering_test_app
+ SOURCES
+ messageordering.cpp
+ DEPENDS
+ messagebus_messagebus-test
+ messagebus
+)
+vespa_add_test(
+ NAME messagebus_messageordering_test_app
+ COMMAND messagebus_messageordering_test_app
+)
diff --git a/messagebus/src/tests/messageordering/DESC b/messagebus/src/tests/messageordering/DESC
new file mode 100644
index 00000000000..a4e636441ac
--- /dev/null
+++ b/messagebus/src/tests/messageordering/DESC
@@ -0,0 +1 @@
+messageordering test. Take a look at messageordering.cpp for details.
diff --git a/messagebus/src/tests/messageordering/FILES b/messagebus/src/tests/messageordering/FILES
new file mode 100644
index 00000000000..51c47a40211
--- /dev/null
+++ b/messagebus/src/tests/messageordering/FILES
@@ -0,0 +1 @@
+messageordering.cpp
diff --git a/messagebus/src/tests/messageordering/messageordering.cpp b/messagebus/src/tests/messageordering/messageordering.cpp
new file mode 100644
index 00000000000..97daee80d99
--- /dev/null
+++ b/messagebus/src/tests/messageordering/messageordering.cpp
@@ -0,0 +1,178 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP("messageordering_test");
+#include <vespa/vespalib/testkit/testapp.h>
+#include <vespa/messagebus/messagebus.h>
+#include <vespa/messagebus/sourcesession.h>
+#include <vespa/messagebus/destinationsession.h>
+#include <vespa/messagebus/testlib/slobrok.h>
+#include <vespa/messagebus/testlib/testserver.h>
+#include <vespa/messagebus/routing/routingspec.h>
+#include <vespa/messagebus/testlib/receptor.h>
+#include <vespa/messagebus/sourcesessionparams.h>
+#include <vespa/messagebus/testlib/simplemessage.h>
+#include <vespa/messagebus/testlib/simplereply.h>
+#include <vespa/messagebus/testlib/simpleprotocol.h>
+#include <vespa/messagebus/errorcode.h>
+#include <vespa/vespalib/util/vstringfmt.h>
+#include <stdexcept>
+
+using namespace mbus;
+
+TEST_SETUP(Test);
+
+RoutingSpec
+getRouting()
+{
+ return RoutingSpec()
+ .addTable(RoutingTableSpec("Simple")
+ .addHop(HopSpec("dst", "test/dst/session"))
+ .addRoute(RouteSpec("test").addHop("dst")));
+}
+
+class MultiReceptor : public IMessageHandler
+{
+private:
+ vespalib::Monitor _mon;
+ DestinationSession* _destinationSession;
+ int _messageCounter;
+
+ MultiReceptor(const Receptor &);
+ MultiReceptor &operator=(const Receptor &);
+public:
+ MultiReceptor()
+ : _mon(),
+ _destinationSession(0),
+ _messageCounter(0)
+ {}
+ virtual void handleMessage(Message::UP msg)
+ {
+ SimpleMessage& simpleMsg(dynamic_cast<SimpleMessage&>(*msg));
+ LOG(spam, "Attempting to acquire lock for %s",
+ simpleMsg.getValue().c_str());
+
+ vespalib::MonitorGuard lock(_mon);
+
+ vespalib::string expected(vespalib::make_vespa_string("%d", _messageCounter));
+ LOG(debug, "Got message %p with %s, expecting %s",
+ msg.get(),
+ simpleMsg.getValue().c_str(),
+ expected.c_str());
+
+ SimpleReply::UP sr(new SimpleReply("test reply"));
+ msg->swapState(*sr);
+
+ if (simpleMsg.getValue() != expected) {
+ std::stringstream ss;
+ ss << "Received out-of-sequence message! Expected "
+ << expected
+ << ", but got "
+ << simpleMsg.getValue();
+ //LOG(warning, "%s", ss.str().c_str());
+ sr->addError(Error(ErrorCode::FATAL_ERROR, ss.str()));
+ }
+ sr->setValue(simpleMsg.getValue());
+
+ ++_messageCounter;
+ _destinationSession->reply(Reply::UP(sr.release()));
+ }
+ void setDestinationSession(DestinationSession& sess) {
+ _destinationSession = &sess;
+ }
+};
+
+class VerifyReplyReceptor : public IReplyHandler
+{
+ vespalib::Monitor _mon;
+ std::string _failure;
+ int _replyCount;
+public:
+ VerifyReplyReceptor()
+ : _mon(),
+ _failure(),
+ _replyCount(0)
+ {}
+ void handleReply(Reply::UP reply)
+ {
+ vespalib::MonitorGuard lock(_mon);
+ if (reply->hasErrors()) {
+ std::ostringstream ss;
+ ss << "Reply failed with "
+ << reply->getError(0).getMessage()
+ << "\n"
+ << reply->getTrace().toString();
+ if (_failure.empty()) {
+ _failure = ss.str();
+ }
+ LOG(warning, "%s", ss.str().c_str());
+ } else {
+ vespalib::string expected(vespalib::make_vespa_string("%d", _replyCount));
+ SimpleReply& simpleReply(static_cast<SimpleReply&>(*reply));
+ if (simpleReply.getValue() != expected) {
+ std::stringstream ss;
+ ss << "Received out-of-sequence reply! Expected "
+ << expected
+ << ", but got "
+ << simpleReply.getValue();
+ LOG(warning, "%s", ss.str().c_str());
+ if (_failure.empty()) {
+ _failure = ss.str();
+ }
+ }
+ }
+ ++_replyCount;
+ lock.broadcast();
+ }
+ void waitUntilDone(int waitForCount) const
+ {
+ vespalib::MonitorGuard lock(_mon);
+ while (_replyCount < waitForCount) {
+ lock.wait(1000);
+ }
+ }
+ const std::string& getFailure() const { return _failure; }
+};
+
+int
+Test::Main()
+{
+ TEST_INIT("messageordering_test");
+
+ Slobrok slobrok;
+ TestServer srcNet(Identity("test/src"), getRouting(), slobrok);
+ TestServer dstNet(Identity("test/dst"), getRouting(), slobrok);
+
+ VerifyReplyReceptor src;
+ MultiReceptor dst;
+
+ SourceSessionParams ssp;
+ ssp.setThrottlePolicy(IThrottlePolicy::SP());
+ ssp.setTimeout(400);
+ SourceSession::UP ss = srcNet.mb.createSourceSession(src, ssp);
+ DestinationSession::UP ds = dstNet.mb.createDestinationSession("session", true, dst);
+ ASSERT_EQUAL(400u, ssp.getTimeout());
+
+ // wait for slobrok registration
+ ASSERT_TRUE(srcNet.waitSlobrok("test/dst/session"));
+
+ // same message id for all messages in order to guarantee ordering
+ int commonMessageId = 42;
+
+ // send messages on client
+ const int messageCount = 10000;
+ for (int i = 0; i < messageCount; ++i) {
+ vespalib::string str(vespalib::make_vespa_string("%d", i));
+ //FastOS_Thread::Sleep(1);
+ SimpleMessage::UP msg(new SimpleMessage(str, true, commonMessageId));
+ msg->getTrace().setLevel(9);
+ //LOG(debug, "Sending message %p for %d", msg.get(), i);
+ ASSERT_EQUAL(uint32_t(ErrorCode::NONE),
+ ss->send(std::move(msg), "test").getError().getCode());
+ }
+ src.waitUntilDone(messageCount);
+
+ ASSERT_EQUAL(std::string(), src.getFailure());
+
+ TEST_DONE();
+}