diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /messagebus/src/tests/messageordering |
Publish
Diffstat (limited to 'messagebus/src/tests/messageordering')
-rw-r--r-- | messagebus/src/tests/messageordering/.gitignore | 3 | ||||
-rw-r--r-- | messagebus/src/tests/messageordering/CMakeLists.txt | 12 | ||||
-rw-r--r-- | messagebus/src/tests/messageordering/DESC | 1 | ||||
-rw-r--r-- | messagebus/src/tests/messageordering/FILES | 1 | ||||
-rw-r--r-- | messagebus/src/tests/messageordering/messageordering.cpp | 178 |
5 files changed, 195 insertions, 0 deletions
diff --git a/messagebus/src/tests/messageordering/.gitignore b/messagebus/src/tests/messageordering/.gitignore new file mode 100644 index 00000000000..1e4e97670de --- /dev/null +++ b/messagebus/src/tests/messageordering/.gitignore @@ -0,0 +1,3 @@ +/.depend +/Makefile +messagebus_messageordering_test_app diff --git a/messagebus/src/tests/messageordering/CMakeLists.txt b/messagebus/src/tests/messageordering/CMakeLists.txt new file mode 100644 index 00000000000..b3af6386684 --- /dev/null +++ b/messagebus/src/tests/messageordering/CMakeLists.txt @@ -0,0 +1,12 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(messagebus_messageordering_test_app + SOURCES + messageordering.cpp + DEPENDS + messagebus_messagebus-test + messagebus +) +vespa_add_test( + NAME messagebus_messageordering_test_app + COMMAND messagebus_messageordering_test_app +) diff --git a/messagebus/src/tests/messageordering/DESC b/messagebus/src/tests/messageordering/DESC new file mode 100644 index 00000000000..a4e636441ac --- /dev/null +++ b/messagebus/src/tests/messageordering/DESC @@ -0,0 +1 @@ +messageordering test. Take a look at messageordering.cpp for details. diff --git a/messagebus/src/tests/messageordering/FILES b/messagebus/src/tests/messageordering/FILES new file mode 100644 index 00000000000..51c47a40211 --- /dev/null +++ b/messagebus/src/tests/messageordering/FILES @@ -0,0 +1 @@ +messageordering.cpp diff --git a/messagebus/src/tests/messageordering/messageordering.cpp b/messagebus/src/tests/messageordering/messageordering.cpp new file mode 100644 index 00000000000..97daee80d99 --- /dev/null +++ b/messagebus/src/tests/messageordering/messageordering.cpp @@ -0,0 +1,178 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("messageordering_test"); +#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/messagebus/messagebus.h> +#include <vespa/messagebus/sourcesession.h> +#include <vespa/messagebus/destinationsession.h> +#include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/messagebus/testlib/testserver.h> +#include <vespa/messagebus/routing/routingspec.h> +#include <vespa/messagebus/testlib/receptor.h> +#include <vespa/messagebus/sourcesessionparams.h> +#include <vespa/messagebus/testlib/simplemessage.h> +#include <vespa/messagebus/testlib/simplereply.h> +#include <vespa/messagebus/testlib/simpleprotocol.h> +#include <vespa/messagebus/errorcode.h> +#include <vespa/vespalib/util/vstringfmt.h> +#include <stdexcept> + +using namespace mbus; + +TEST_SETUP(Test); + +RoutingSpec +getRouting() +{ + return RoutingSpec() + .addTable(RoutingTableSpec("Simple") + .addHop(HopSpec("dst", "test/dst/session")) + .addRoute(RouteSpec("test").addHop("dst"))); +} + +class MultiReceptor : public IMessageHandler +{ +private: + vespalib::Monitor _mon; + DestinationSession* _destinationSession; + int _messageCounter; + + MultiReceptor(const Receptor &); + MultiReceptor &operator=(const Receptor &); +public: + MultiReceptor() + : _mon(), + _destinationSession(0), + _messageCounter(0) + {} + virtual void handleMessage(Message::UP msg) + { + SimpleMessage& simpleMsg(dynamic_cast<SimpleMessage&>(*msg)); + LOG(spam, "Attempting to acquire lock for %s", + simpleMsg.getValue().c_str()); + + vespalib::MonitorGuard lock(_mon); + + vespalib::string expected(vespalib::make_vespa_string("%d", _messageCounter)); + LOG(debug, "Got message %p with %s, expecting %s", + msg.get(), + simpleMsg.getValue().c_str(), + expected.c_str()); + + SimpleReply::UP sr(new SimpleReply("test reply")); + msg->swapState(*sr); + + if (simpleMsg.getValue() != expected) { + std::stringstream ss; + ss << "Received out-of-sequence message! Expected " + << expected + << ", but got " + << simpleMsg.getValue(); + //LOG(warning, "%s", ss.str().c_str()); + sr->addError(Error(ErrorCode::FATAL_ERROR, ss.str())); + } + sr->setValue(simpleMsg.getValue()); + + ++_messageCounter; + _destinationSession->reply(Reply::UP(sr.release())); + } + void setDestinationSession(DestinationSession& sess) { + _destinationSession = &sess; + } +}; + +class VerifyReplyReceptor : public IReplyHandler +{ + vespalib::Monitor _mon; + std::string _failure; + int _replyCount; +public: + VerifyReplyReceptor() + : _mon(), + _failure(), + _replyCount(0) + {} + void handleReply(Reply::UP reply) + { + vespalib::MonitorGuard lock(_mon); + if (reply->hasErrors()) { + std::ostringstream ss; + ss << "Reply failed with " + << reply->getError(0).getMessage() + << "\n" + << reply->getTrace().toString(); + if (_failure.empty()) { + _failure = ss.str(); + } + LOG(warning, "%s", ss.str().c_str()); + } else { + vespalib::string expected(vespalib::make_vespa_string("%d", _replyCount)); + SimpleReply& simpleReply(static_cast<SimpleReply&>(*reply)); + if (simpleReply.getValue() != expected) { + std::stringstream ss; + ss << "Received out-of-sequence reply! Expected " + << expected + << ", but got " + << simpleReply.getValue(); + LOG(warning, "%s", ss.str().c_str()); + if (_failure.empty()) { + _failure = ss.str(); + } + } + } + ++_replyCount; + lock.broadcast(); + } + void waitUntilDone(int waitForCount) const + { + vespalib::MonitorGuard lock(_mon); + while (_replyCount < waitForCount) { + lock.wait(1000); + } + } + const std::string& getFailure() const { return _failure; } +}; + +int +Test::Main() +{ + TEST_INIT("messageordering_test"); + + Slobrok slobrok; + TestServer srcNet(Identity("test/src"), getRouting(), slobrok); + TestServer dstNet(Identity("test/dst"), getRouting(), slobrok); + + VerifyReplyReceptor src; + MultiReceptor dst; + + SourceSessionParams ssp; + ssp.setThrottlePolicy(IThrottlePolicy::SP()); + ssp.setTimeout(400); + SourceSession::UP ss = srcNet.mb.createSourceSession(src, ssp); + DestinationSession::UP ds = dstNet.mb.createDestinationSession("session", true, dst); + ASSERT_EQUAL(400u, ssp.getTimeout()); + + // wait for slobrok registration + ASSERT_TRUE(srcNet.waitSlobrok("test/dst/session")); + + // same message id for all messages in order to guarantee ordering + int commonMessageId = 42; + + // send messages on client + const int messageCount = 10000; + for (int i = 0; i < messageCount; ++i) { + vespalib::string str(vespalib::make_vespa_string("%d", i)); + //FastOS_Thread::Sleep(1); + SimpleMessage::UP msg(new SimpleMessage(str, true, commonMessageId)); + msg->getTrace().setLevel(9); + //LOG(debug, "Sending message %p for %d", msg.get(), i); + ASSERT_EQUAL(uint32_t(ErrorCode::NONE), + ss->send(std::move(msg), "test").getError().getCode()); + } + src.waitUntilDone(messageCount); + + ASSERT_EQUAL(std::string(), src.getFailure()); + + TEST_DONE(); +} |