diff options
Diffstat (limited to 'messagebus/src/tests/sendadapter/sendadapter.cpp')
-rw-r--r-- | messagebus/src/tests/sendadapter/sendadapter.cpp | 252 |
1 files changed, 252 insertions, 0 deletions
diff --git a/messagebus/src/tests/sendadapter/sendadapter.cpp b/messagebus/src/tests/sendadapter/sendadapter.cpp new file mode 100644 index 00000000000..b25240acdac --- /dev/null +++ b/messagebus/src/tests/sendadapter/sendadapter.cpp @@ -0,0 +1,252 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("sendadapter_test"); + +#include <vespa/messagebus/messagebus.h> +#include <vespa/messagebus/testlib/receptor.h> +#include <vespa/messagebus/testlib/simplemessage.h> +#include <vespa/messagebus/testlib/simpleprotocol.h> +#include <vespa/messagebus/testlib/simplereply.h> +#include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/messagebus/testlib/testserver.h> +#include <vespa/vespalib/testkit/testapp.h> + +using namespace mbus; + +//////////////////////////////////////////////////////////////////////////////// +// +// Setup +// +//////////////////////////////////////////////////////////////////////////////// + +class TestProtocol : public mbus::SimpleProtocol { +private: + mutable vespalib::Version _lastVersion; + +public: + typedef std::shared_ptr<TestProtocol> SP; + mbus::Blob encode(const vespalib::Version &version, const mbus::Routable &routable) const { + _lastVersion = version; + return mbus::SimpleProtocol::encode(version, routable); + } + mbus::Routable::UP decode(const vespalib::Version &version, mbus::BlobRef blob) const { + _lastVersion = version; + return mbus::SimpleProtocol::decode(version, blob); + } + const vespalib::Version &getLastVersion() { return _lastVersion; } +}; + +class TestData { +public: + Slobrok _slobrok; + TestProtocol::SP _srcProtocol; + TestServer _srcServer; + SourceSession::UP _srcSession; + Receptor _srcHandler; + TestProtocol::SP _itrProtocol; + TestServer _itrServer; + IntermediateSession::UP _itrSession; + Receptor _itrHandler; + TestProtocol::SP _dstProtocol; + TestServer _dstServer; + DestinationSession::UP _dstSession; + Receptor _dstHandler; + +public: + TestData(); + bool start(); +}; + +class Test : public vespalib::TestApp { +private: + static const int TIMEOUT_SECS = 60; + + bool testVersionedSend(TestData &data, + const vespalib::Version &srcVersion, + const vespalib::Version &itrVersion, + const vespalib::Version &dstVersion); + void testSendAdapters(TestData &data); + +public: + int Main(); +}; + +TEST_APPHOOK(Test); + +TestData::TestData() : + _slobrok(), + _srcProtocol(new TestProtocol()), + _srcServer(MessageBusParams().setRetryPolicy(IRetryPolicy::SP()).addProtocol(_srcProtocol), + RPCNetworkParams().setSlobrokConfig(_slobrok.config())), + _srcSession(), + _srcHandler(), + _itrProtocol(new TestProtocol()), + _itrServer(MessageBusParams().addProtocol(_itrProtocol), + RPCNetworkParams().setIdentity(Identity("itr")).setSlobrokConfig(_slobrok.config())), + _itrSession(), + _itrHandler(), + _dstProtocol(new TestProtocol()), + _dstServer(MessageBusParams().addProtocol(_dstProtocol), + RPCNetworkParams().setIdentity(Identity("dst")).setSlobrokConfig(_slobrok.config())), + _dstSession(), + _dstHandler() +{ + // empty +} + +bool +TestData::start() +{ + _srcSession = _srcServer.mb.createSourceSession(SourceSessionParams().setReplyHandler(_srcHandler)); + if (_srcSession.get() == NULL) { + return false; + } + _itrSession = _itrServer.mb.createIntermediateSession(IntermediateSessionParams().setName("session").setMessageHandler(_itrHandler).setReplyHandler(_itrHandler)); + if (_itrSession.get() == NULL) { + return false; + } + _dstSession = _dstServer.mb.createDestinationSession(DestinationSessionParams().setName("session").setMessageHandler(_dstHandler)); + if (_dstSession.get() == NULL) { + return false; + } + if (!_srcServer.waitSlobrok("*/session", 2u)) { + return false; + } + return true; +} + +int +Test::Main() +{ + TEST_INIT("sendadapter_test"); + + TestData data; + ASSERT_TRUE(data.start()); + + testSendAdapters(data); TEST_FLUSH(); + + TEST_DONE(); +} + +//////////////////////////////////////////////////////////////////////////////// +// +// Tests +// +//////////////////////////////////////////////////////////////////////////////// + +void +Test::testSendAdapters(TestData &data) +{ + std::vector<vespalib::Version> versions; + versions.push_back(vespalib::Version(5, 0)); + versions.push_back(vespalib::Version(5, 1)); + + for (std::vector<vespalib::Version>::const_iterator srcVersion = versions.begin(); + srcVersion != versions.end(); ++srcVersion) + { + for (std::vector<vespalib::Version>::const_iterator itrVersion = versions.begin(); + itrVersion != versions.end(); ++itrVersion) + { + for (std::vector<vespalib::Version>::const_iterator dstVersion = versions.begin(); + dstVersion != versions.end(); ++dstVersion) + { + EXPECT_TRUE(testVersionedSend(data, *srcVersion, *itrVersion, *dstVersion)); + } + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +// +// Utilities +// +//////////////////////////////////////////////////////////////////////////////// + +bool +Test::testVersionedSend(TestData &data, + const vespalib::Version &srcVersion, + const vespalib::Version &itrVersion, + const vespalib::Version &dstVersion) +{ + LOG(info, "Sending from %s through %s to %s.", + srcVersion.toString().c_str(), itrVersion.toString().c_str(), dstVersion.toString().c_str()); + data._srcServer.net.setVersion(srcVersion); + data._itrServer.net.setVersion(itrVersion); + data._dstServer.net.setVersion(dstVersion); + + Message::UP msg(new SimpleMessage("foo")); + msg->getTrace().setLevel(9); + if (!EXPECT_TRUE(data._srcSession->send(std::move(msg), Route::parse("itr/session dst/session")).isAccepted())) { + return false; + } + msg = data._itrHandler.getMessage(TIMEOUT_SECS); + if (!EXPECT_TRUE(msg.get() != NULL)) { + return false; + } + LOG(info, "Message version %s serialized at source.", + data._srcProtocol->getLastVersion().toString().c_str()); + vespalib::Version minVersion = std::min(srcVersion, itrVersion); + if (!EXPECT_TRUE(minVersion == data._srcProtocol->getLastVersion())) { + return false; + } + + LOG(info, "Message version %s reached intermediate.", + data._itrProtocol->getLastVersion().toString().c_str()); + if (!EXPECT_TRUE(minVersion == data._itrProtocol->getLastVersion())) { + return false; + } + data._itrSession->forward(std::move(msg)); + msg = data._dstHandler.getMessage(TIMEOUT_SECS); + if (!EXPECT_TRUE(msg.get() != NULL)) { + return false; + } + LOG(info, "Message version %s serialized at intermediate.", + data._itrProtocol->getLastVersion().toString().c_str()); + minVersion = std::min(itrVersion, dstVersion); + if (!EXPECT_TRUE(minVersion == data._itrProtocol->getLastVersion())) { + return false; + } + + LOG(info, "Message version %s reached destination.", + data._dstProtocol->getLastVersion().toString().c_str()); + if (!EXPECT_TRUE(minVersion == data._dstProtocol->getLastVersion())) { + return false; + } + Reply::UP reply(new SimpleReply("bar")); + reply->swapState(*msg); + data._dstSession->reply(std::move(reply)); + reply = data._itrHandler.getReply(); + if (!EXPECT_TRUE(reply.get() != NULL)) { + return false; + } + LOG(info, "Reply version %s serialized at destination.", + data._dstProtocol->getLastVersion().toString().c_str()); + if (!EXPECT_TRUE(minVersion == data._dstProtocol->getLastVersion())) { + return false; + } + + LOG(info, "Reply version %s reached intermediate.", + data._itrProtocol->getLastVersion().toString().c_str()); + if (!EXPECT_TRUE(minVersion == data._itrProtocol->getLastVersion())) { + return false; + } + data._itrSession->forward(std::move(reply)); + reply = data._srcHandler.getReply(); + if (!EXPECT_TRUE(reply.get() != NULL)) { + return false; + } + LOG(info, "Reply version %s serialized at intermediate.", + data._dstProtocol->getLastVersion().toString().c_str()); + minVersion = std::min(srcVersion, itrVersion); + if (!EXPECT_TRUE(minVersion == data._itrProtocol->getLastVersion())) { + return false; + } + + LOG(info, "Reply version %s reached source.", + data._srcProtocol->getLastVersion().toString().c_str()); + if (!EXPECT_TRUE(minVersion == data._srcProtocol->getLastVersion())) { + return false; + } + return true; +} |